diff --git a/Jenkinsfile b/Jenkinsfile index 768d0746..087c0c64 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -22,6 +22,8 @@ pipeline { // 镜像仓库配置(Infra 服务器内网地址,Prod 服务器可通过内网拉取) REGISTRY = '172.17.16.7:5000' + REGISTRY_HOST = '172.17.16.7' + REGISTRY_CONTAINER = 'registry' DEPS_IMAGE = "${REGISTRY}/aiot-deps:latest" // 服务配置 @@ -36,6 +38,13 @@ pipeline { STAGING_DEPLOY_HOST = '172.17.16.7' STAGING_DEPLOY_PATH = '/opt/aiot-platform-cloud' + // 磁盘守护阈值(%):低于 MIN 直接 fail;低于 WARN 仅告警 + DISK_FREE_MIN_PCT = '5' + DISK_FREE_WARN_PCT = '10' + + // 镜像保留份数(每服务) + IMAGE_KEEP_COUNT = '3' + // 性能配置 - 将动态调整 BUILD_TIMEOUT = 45 DEPLOY_TIMEOUT = 10 @@ -270,6 +279,30 @@ pipeline { } } + stage('Pre-deploy Check') { + when { + allOf { + expression { env.SERVICES_TO_BUILD != '' } + anyOf { + branch 'master' + branch 'release/next' + } + } + } + steps { + script { + def stageStartTime = System.currentTimeMillis() + echo "🛡️ Pre-deploy health check: remote disk & SSH reachability" + + // 检查 Prod 与 Registry 两台主机的磁盘,低于阈值 fail fast,避免 sshd 在磁盘满时被拖垮 + checkRemoteDiskOrFail(env.DEPLOY_HOST, 'Deploy') + checkRemoteDiskOrFail(env.REGISTRY_HOST, 'Registry') + + recordStageMetrics('Pre-deploy Check', stageStartTime) + } + } + } + stage('Deploy') { when { allOf { @@ -283,7 +316,7 @@ pipeline { steps { script { def stageStartTime = System.currentTimeMillis() - + def servicesToDeploy = env.SERVICES_TO_BUILD.split(',') def sortedServices = sortServicesByDependency(servicesToDeploy) @@ -375,6 +408,32 @@ pipeline { } } } + + stage('Cleanup Old Images') { + when { + allOf { + expression { env.SERVICES_TO_BUILD != '' } + anyOf { + branch 'master' + branch 'release/next' + } + } + } + steps { + script { + def stageStartTime = System.currentTimeMillis() + echo "🧹 Cleaning up old images (keep=${env.IMAGE_KEEP_COUNT})" + + // Prod/Staging 本地:清旧镜像 + dangling + builder cache + cleanupDeployHost(env.DEPLOY_HOST, env.IMAGE_KEEP_COUNT) + + // Registry:按保留策略删 manifest + 触发 GC 释放磁盘 + cleanupRegistry(env.REGISTRY_HOST, env.REGISTRY_CONTAINER, env.IMAGE_KEEP_COUNT) + + recordStageMetrics('Cleanup Old Images', stageStartTime) + } + } + } } post { @@ -1073,3 +1132,73 @@ def getModulePathForService(String service) { ] return map.get(service, service) } + +// ============================================ +// 磁盘/清理相关 helper(Prod + Registry) +// ============================================ + +// 检查远端磁盘剩余百分比;低于 MIN 阈值直接 fail;低于 WARN 仅告警 +def checkRemoteDiskOrFail(String host, String role) { + def sshOpts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -i ${env.SSH_KEY}" + def minPct = Integer.parseInt(env.DISK_FREE_MIN_PCT) + def warnPct = Integer.parseInt(env.DISK_FREE_WARN_PCT) + + def freePct + try { + // awk 的 $5+0 会把 "42%" 强转为 42,避免多层 gsub 转义 + freePct = sh( + script: "ssh ${sshOpts} root@${host} \"df -P / | awk 'NR==2 { print 100 - \\\$5+0 }'\"", + returnStdout: true + ).trim() as int + } catch (Exception e) { + error("❌ [${role}] 无法通过 SSH 到 ${host} 读取磁盘(可能 sshd 已被磁盘满拖垮):${e.message}") + } + + echo " ${role}@${host}: 根分区空闲 ${freePct}%" + if (freePct < minPct) { + error("❌ [${role}] ${host} 根分区空闲仅 ${freePct}% < ${minPct}%,终止部署避免二次失败。请先手动清理或跑 scripts/cleanup.sh") + } else if (freePct < warnPct) { + echo "⚠️ [${role}] ${host} 空闲 ${freePct}% < ${warnPct}%,本次部署后会触发自动清理" + } +} + +// Prod/Staging 本地清理:调用仓库内的 cleanup.sh +def cleanupDeployHost(String host, String keep) { + def sshOpts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -i ${env.SSH_KEY}" + try { + echo "📤 Syncing scripts/cleanup.sh to ${host}..." + sh "scp ${sshOpts} scripts/cleanup.sh root@${host}:${env.DEPLOY_PATH}/cleanup.sh" + sh """ + ssh ${sshOpts} root@${host} ' + cd ${env.DEPLOY_PATH} + chmod +x cleanup.sh + ./cleanup.sh --keep=${keep} --registry=${env.REGISTRY} + ' + """ + echo "✅ Deploy host 清理完成" + } catch (Exception e) { + // 清理失败不影响发布结果,仅告警 + echo "⚠️ Deploy host 清理失败(不致命):${e.message}" + } +} + +// Registry 清理:同步 python 脚本 → 按保留策略删 manifest → GC +def cleanupRegistry(String host, String registryContainer, String keep) { + def sshOpts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -i ${env.SSH_KEY}" + try { + echo "📤 Syncing scripts/registry-cleanup.py to ${host}..." + sh "scp ${sshOpts} scripts/registry-cleanup.py root@${host}:/tmp/registry-cleanup.py" + sh """ + ssh ${sshOpts} root@${host} ' + python3 /tmp/registry-cleanup.py \ + --registry http://localhost:5000 \ + --keep ${keep} \ + --repos ${env.CORE_SERVICES} \ + --gc-container ${registryContainer} + ' + """ + echo "✅ Registry 清理 + GC 完成" + } catch (Exception e) { + echo "⚠️ Registry 清理失败(不致命):${e.message}" + } +} diff --git a/scripts/cleanup.sh b/scripts/cleanup.sh index 5107e117..3cddf919 100644 --- a/scripts/cleanup.sh +++ b/scripts/cleanup.sh @@ -2,80 +2,122 @@ # ============================================ # AIOT Platform - 清理脚本 -# 清理旧镜像和容器,释放存储空间 +# 清理部署主机上的旧镜像 / 停止容器 / 构建缓存,释放存储空间 # ============================================ set -e +# ---- 默认参数 ---- +KEEP=3 +PRUNE_VOLUMES=false +REGISTRY_HOST="localhost:5000" + +usage() { + cat </dev/null || true + + # 按创建时间降序取 ID 列表,跳过 latest tag,保留前 KEEP 个 + mapfile -t ids_to_delete < <( + docker images "${REGISTRY_HOST}/${service}" \ + --format '{{.CreatedAt}}|{{.ID}}|{{.Tag}}' \ + | grep -v '|latest$' \ + | sort -r \ + | awk -F'|' -v k="$KEEP" 'NR > k {print $2}' + ) + + if [ "${#ids_to_delete[@]}" -eq 0 ]; then + log_info " └─ 无可清理镜像" + continue + fi + + log_info " └─ 删除 ${#ids_to_delete[@]} 个旧镜像" + # 去重后批量删 + printf '%s\n' "${ids_to_delete[@]}" | sort -u | xargs -r docker rmi -f 2>/dev/null || true done -# 清理未使用的卷(谨慎使用) -log_warn "是否清理未使用的卷? (y/N)" -read -r response -if [ "$response" = "y" ] || [ "$response" = "Y" ]; then - log_info "清理未使用的卷..." +if [ "$PRUNE_VOLUMES" = true ]; then + log_warn "清理未使用的 volume(--prune-volumes 已启用)" docker volume prune -f +else + log_info "跳过 volume 清理(如需清理请加 --prune-volumes)" fi -# 清理构建缓存 -log_info "清理 Docker 构建缓存..." -docker builder prune -f --filter "until=24h" +log_info "清理 Docker 构建缓存(24h 前)..." +docker builder prune -f --filter "until=24h" || true -# 显示清理后的磁盘使用情况 echo "" log_info "=========================================" log_info "清理完成" log_info "=========================================" echo "" log_info "清理后磁盘使用情况:" -df -h | grep -E "Filesystem|/$" +df -h | grep -E "Filesystem|/$" || true echo "" log_info "清理后 Docker 磁盘使用:" docker system df diff --git a/scripts/registry-cleanup.py b/scripts/registry-cleanup.py new file mode 100644 index 00000000..20f1dcc5 --- /dev/null +++ b/scripts/registry-cleanup.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python3 +""" +Docker Registry 镜像清理工具 + +按 tag 语义/时间排序,每个仓库保留最近 N 个版本(默认 3),其余逻辑删除。 +支持可选触发容器内 `registry garbage-collect` 真正回收磁盘空间。 + +典型用法(在 Registry 宿主机上执行): + # 保留最近 3 个 + python3 registry-cleanup.py + + # 指定仓库列表 + GC + python3 registry-cleanup.py \\ + --registry http://localhost:5000 \\ + --keep 3 \\ + --repos viewsh-gateway,viewsh-module-iot-server \\ + --gc-container registry + + # 空跑查看计划(不执行删除) + python3 registry-cleanup.py --dry-run + +退出码:0=成功 / 1=参数错误 / 2=Registry 不可达 / 3=部分仓库处理失败 +""" + +import argparse +import json +import re +import subprocess +import sys +import urllib.error +import urllib.request +from typing import List, Optional, Tuple + +DEFAULT_REGISTRY = "http://localhost:5000" +DEFAULT_KEEP = 3 +DEFAULT_REPOS = [ + "viewsh-gateway", + "viewsh-module-infra-server", + "viewsh-module-iot-gateway", + "viewsh-module-iot-server", + "viewsh-module-ops-server", + "viewsh-module-system-server", +] + +# 覆盖常见的 manifest 媒体类型,避免 BuildKit/OCI 推的 tag 取不到 digest +MANIFEST_ACCEPT = ", ".join([ + "application/vnd.docker.distribution.manifest.v2+json", + "application/vnd.docker.distribution.manifest.list.v2+json", + "application/vnd.oci.image.manifest.v1+json", + "application/vnd.oci.image.index.v1+json", +]) + + +def http_request(method: str, url: str, headers: Optional[dict] = None, timeout: int = 10): + req = urllib.request.Request(url, method=method, headers=headers or {}) + return urllib.request.urlopen(req, timeout=timeout) + + +def list_tags(registry: str, repo: str) -> List[str]: + url = f"{registry}/v2/{repo}/tags/list" + try: + with http_request("GET", url) as resp: + body = resp.read().decode("utf-8") + except urllib.error.HTTPError as e: + if e.code == 404: + return [] + raise + data = json.loads(body) + return [t for t in (data.get("tags") or []) if t != "latest"] + + +def get_manifest_digest(registry: str, repo: str, tag: str) -> Optional[str]: + url = f"{registry}/v2/{repo}/manifests/{tag}" + try: + with http_request("HEAD", url, headers={"Accept": MANIFEST_ACCEPT}) as resp: + return resp.headers.get("Docker-Content-Digest") + except urllib.error.HTTPError: + return None + + +def delete_manifest(registry: str, repo: str, digest: str) -> bool: + url = f"{registry}/v2/{repo}/manifests/{digest}" + try: + with http_request("DELETE", url) as resp: + return 200 <= resp.status < 300 + except urllib.error.HTTPError as e: + # 202 Accepted 会抛 HTTPError,特殊处理 + return 200 <= e.code < 300 + + +# tag 排序:优先按"数字/build 号"降序,其次按字典序降序 +_BUILD_NUM_RE = re.compile(r"(\d+)") + + +def tag_sort_key(tag: str) -> Tuple[int, str]: + """ + 返回 (数字, 原始tag) 供降序排序使用。 + - 若 tag 包含数字(如 build-123、1.2.3、20260424103000),取最后一个数字段作主排序键 + - 否则数字位 = -1,只用字符串兜底 + """ + nums = _BUILD_NUM_RE.findall(tag) + primary = int(nums[-1]) if nums else -1 + return (primary, tag) + + +def cleanup_repo( + registry: str, + repo: str, + keep: int, + dry_run: bool, +) -> Tuple[int, int, int]: + """ + 返回 (total, deleted, skipped) + """ + tags = list_tags(registry, repo) + total = len(tags) + if total == 0: + print(f" └─ 无 tag,跳过") + return 0, 0, 0 + + # 降序:越新的越靠前 + tags.sort(key=tag_sort_key, reverse=True) + keep_list = tags[:keep] + delete_list = tags[keep:] + + print(f" ├─ 共 {total} 个 tag,保留最新 {len(keep_list)} 个:{keep_list}") + if not delete_list: + print(f" └─ 无需删除") + return total, 0, 0 + + deleted = 0 + skipped = 0 + # 去重:多个 tag 可能指向同一 digest,只需要 DELETE 一次 + seen_digests = set() + for tag in delete_list: + digest = get_manifest_digest(registry, repo, tag) + if not digest: + print(f" │ [SKIP] {tag:30s} (digest 取不到)") + skipped += 1 + continue + if digest in seen_digests: + print(f" │ [DEDUP] {tag:30s} {digest[:19]}...") + continue + seen_digests.add(digest) + if dry_run: + print(f" │ [DRY] {tag:30s} {digest[:19]}...") + deleted += 1 + continue + ok = delete_manifest(registry, repo, digest) + if ok: + print(f" │ [DELETE] {tag:30s} {digest[:19]}...") + deleted += 1 + else: + print(f" │ [FAIL] {tag:30s} {digest[:19]}...") + skipped += 1 + + print(f" └─ 已删除 {deleted},跳过 {skipped}") + return total, deleted, skipped + + +def run_gc(container: str, dry_run: bool) -> bool: + cmd = [ + "docker", "exec", container, + "registry", "garbage-collect", + "--delete-untagged=true", + "/etc/docker/registry/config.yml", + ] + print(f"\n🧹 触发 Registry GC:{' '.join(cmd)}") + if dry_run: + print(" (--dry-run 已跳过实际执行)") + return True + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + except subprocess.TimeoutExpired: + print(" ❌ GC 超时(>5min)") + return False + except FileNotFoundError: + print(" ❌ 找不到 docker 命令,确认脚本跑在 Registry 宿主机") + return False + if result.returncode == 0: + # GC 输出可能很长,只打 tail + tail = "\n".join(result.stdout.splitlines()[-10:]) + print(f" ✅ GC 成功(输出末 10 行):\n{tail}") + return True + print(f" ❌ GC 失败 (rc={result.returncode})") + if result.stderr: + print(f" stderr: {result.stderr.strip()[:500]}") + return False + + +def parse_args(): + p = argparse.ArgumentParser(description="Docker Registry 镜像清理") + p.add_argument("--registry", default=DEFAULT_REGISTRY, help=f"Registry URL(默认 {DEFAULT_REGISTRY})") + p.add_argument("--keep", type=int, default=DEFAULT_KEEP, help=f"每仓库保留版本数(默认 {DEFAULT_KEEP})") + p.add_argument("--repos", default=",".join(DEFAULT_REPOS), + help="逗号分隔的仓库名列表(默认:内置服务清单)") + p.add_argument("--gc-container", default=None, + help="Registry 容器名;指定则删除完后触发 garbage-collect") + p.add_argument("--dry-run", action="store_true", help="只打印计划,不实际删除") + return p.parse_args() + + +def main(): + args = parse_args() + if args.keep < 1: + print("❌ --keep 必须 >= 1", file=sys.stderr) + return 1 + + repos = [r.strip() for r in args.repos.split(",") if r.strip()] + if not repos: + print("❌ 仓库列表为空", file=sys.stderr) + return 1 + + # 连通性检查 + try: + with http_request("GET", f"{args.registry}/v2/", timeout=5): + pass + except (urllib.error.URLError, urllib.error.HTTPError) as e: + # /v2/ 返回 401 也算通(部分 Registry 开启认证) + if not (isinstance(e, urllib.error.HTTPError) and e.code == 401): + print(f"❌ Registry 不可达:{args.registry} — {e}", file=sys.stderr) + return 2 + + print(f"🎯 Registry={args.registry} keep={args.keep} dry_run={args.dry_run}") + print(f"📦 仓库:{repos}\n") + + overall_total = overall_deleted = overall_skipped = 0 + failed_repos = [] + for repo in repos: + print(f"=== {repo} ===") + try: + t, d, s = cleanup_repo(args.registry, repo, args.keep, args.dry_run) + overall_total += t + overall_deleted += d + overall_skipped += s + except Exception as e: + print(f" ❌ 处理异常:{e}") + failed_repos.append(repo) + + print(f"\n📊 总计:扫描 {overall_total} / 删除 {overall_deleted} / 跳过 {overall_skipped}") + if failed_repos: + print(f"⚠️ 失败仓库:{failed_repos}") + + # GC + if args.gc_container and overall_deleted > 0: + ok = run_gc(args.gc_container, args.dry_run) + if not ok: + return 3 + elif args.gc_container: + print("\n🟡 无 manifest 被删除,跳过 GC") + else: + print("\n💡 未指定 --gc-container,逻辑删除完成但磁盘尚未释放;") + print(" 请在 Registry 宿主机手动执行:") + print(f" docker exec registry garbage-collect \\") + print(f" --delete-untagged=true /etc/docker/registry/config.yml") + + return 3 if failed_repos else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java index acf13bf1..59a163b9 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java @@ -10,6 +10,7 @@ import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; import java.util.Map; /** @@ -60,6 +61,7 @@ public class CleanRuleProcessorManager { } Long deviceId = message.getDeviceId(); + LocalDateTime reportTime = message.getReportTime(); @SuppressWarnings("unchecked") Map data = (Map) message.getParams(); @@ -73,7 +75,7 @@ public class CleanRuleProcessorManager { } else { // 属性上报:直接遍历 key-value data.forEach((identifier, value) -> - processDataSafely(deviceId, identifier, value)); + processDataSafely(deviceId, identifier, value, reportTime)); // 4. 蓝牙信号缺失补偿:当设备上报了属性但不含 bluetoothDevices 时, // 主动注入一次 null 调用,使 BeaconDetectionRuleProcessor 能写入 -999(信号缺失), @@ -81,7 +83,7 @@ public class CleanRuleProcessorManager { if (!data.containsKey("bluetoothDevices")) { beaconDetectionRuleProcessor.processPropertyChange(deviceId, "bluetoothDevices", null); // 轨迹检测同样需要信号丢失补偿,注入 null 使窗口写入 -999 - trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null); + trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null, reportTime); } } } @@ -127,7 +129,7 @@ public class CleanRuleProcessorManager { * @param identifier 标识符 * @param value 数据值 */ - private void processDataSafely(Long deviceId, String identifier, Object value) { + private void processDataSafely(Long deviceId, String identifier, Object value, LocalDateTime reportTime) { try { switch (identifier) { case "people_in", "people_out" -> @@ -135,7 +137,7 @@ public class CleanRuleProcessorManager { case "bluetoothDevices" -> { beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value); // 轨迹检测:独立于保洁到岗检测,匹配所有已知 Beacon - trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value); + trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value, reportTime); } default -> { // 其他属性/事件忽略 diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrajectoryDetectionProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrajectoryDetectionProcessor.java index b6fe4b8f..933e9754 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrajectoryDetectionProcessor.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrajectoryDetectionProcessor.java @@ -24,6 +24,7 @@ import org.springframework.stereotype.Component; import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.TimeUnit; @@ -57,6 +58,17 @@ public class TrajectoryDetectionProcessor { private static final String DEVICE_ENABLED_KEY_PATTERN = "iot:trajectory:device:enabled:%s"; private static final int DEVICE_ENABLED_TTL_SECONDS = 3600; // 1小时 + /** + * 最小停留时长(毫秒):进入区域后至少停留这么久,才允许发布 LEAVE/AREA_SWITCH, + * 用于过滤 RSSI 抖动和批量消息回放导致的瞬态切换 + */ + private static final long MIN_STAY_MILLIS = 5_000L; + + /** + * 区域切换滞回阈值(dB):候选区域 RSSI 必须比当前区域 RSSI 高出此值,才允许 AREA_SWITCH + */ + private static final int RSSI_HYSTERESIS_DB = 5; + @Resource private BeaconRegistryService beaconRegistryService; @@ -84,11 +96,20 @@ public class TrajectoryDetectionProcessor { * @param deviceId 设备ID(工牌) * @param identifier 属性标识符 * @param propertyValue 蓝牙设备列表 + * @param reportTime 设备上报时间(可为 null,为空则用当前时间兜底) */ - public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) { + public void processPropertyChange(Long deviceId, String identifier, Object propertyValue, + LocalDateTime reportTime) { if (!"bluetoothDevices".equals(identifier)) { return; } + LocalDateTime eventTime; + if (reportTime != null) { + eventTime = reportTime; + } else { + eventTime = LocalDateTime.now(); + log.warn("[Trajectory] reportTime 为空,使用当前时间兜底:deviceId={}(若频繁出现请排查上游调用链)", deviceId); + } // 1. 检查设备是否开启轨迹功能 if (!isTrajectoryEnabled(deviceId)) { @@ -125,9 +146,9 @@ public class TrajectoryDetectionProcessor { // 8. 处理区域状态变化 if (currentArea != null) { - processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex); + processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex, eventTime); } else { - processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex); + processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex, eventTime); } } @@ -139,8 +160,11 @@ public class TrajectoryDetectionProcessor { private void processWithCurrentArea(Long deviceId, TrajectoryStateRedisDAO.CurrentAreaInfo currentArea, Map matchedBeacons, - Map areaConfigIndex) { + Map areaConfigIndex, + LocalDateTime eventTime) { Long currentAreaId = currentArea.getAreaId(); + long stayMillis = System.currentTimeMillis() - (currentArea.getEnterTime() != null ? currentArea.getEnterTime() : 0L); + boolean minStayReached = stayMillis >= MIN_STAY_MILLIS; // 6a. 检查当前区域的退出条件 BeaconPresenceConfig currentConfig = areaConfigIndex.get(currentAreaId); @@ -153,16 +177,22 @@ public class TrajectoryDetectionProcessor { AreaState.IN_AREA); if (exitResult == DetectionResult.LEAVE_CONFIRMED) { + if (!minStayReached) { + // 未达到最小停留时长,视为瞬态抖动,忽略本次离开 + log.debug("[Trajectory] 未达最小停留,忽略 LEAVE:deviceId={}, areaId={}, stayMs={}", + deviceId, currentAreaId, stayMillis); + return; + } // 确认离开当前区域 publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(), - "SIGNAL_LOSS", currentArea.getEnterTime()); + "SIGNAL_LOSS", currentArea.getEnterTime(), eventTime); stateRedisDAO.clearCurrentArea(deviceId); windowRedisDAO.clearWindow(deviceId, currentAreaId); log.info("[Trajectory] 离开区域:deviceId={}, areaId={}, reason=SIGNAL_LOSS", deviceId, currentAreaId); // 离开后,尝试进入新区域 - processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex); + processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex, eventTime); return; } } @@ -170,15 +200,29 @@ public class TrajectoryDetectionProcessor { // 6b. 当前区域未退出,检查是否有更强区域触发切换 MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, currentAreaId); if (bestCandidate != null && !bestCandidate.areaId.equals(currentAreaId)) { + if (!minStayReached) { + log.debug("[Trajectory] 未达最小停留,忽略 AREA_SWITCH:deviceId={}, from={}, to={}, stayMs={}", + deviceId, currentAreaId, bestCandidate.areaId, stayMillis); + return; + } + // 切换滞回:候选 RSSI 必须显著强于当前区域 RSSI + // 优先取本次匹配值;若当前未匹配到,回退到窗口里最近一次非缺失(-999)样本, + // 避免当前信标短暂漏扫时滞回被 -999 哨兵破坏 + int currentRssi = resolveCurrentAreaRssi(deviceId, currentAreaId, matchedBeacons); + if (bestCandidate.rssi - currentRssi < RSSI_HYSTERESIS_DB) { + log.debug("[Trajectory] 未达滞回阈值,忽略 AREA_SWITCH:deviceId={}, from={}({}dBm), to={}({}dBm)", + deviceId, currentAreaId, currentRssi, bestCandidate.areaId, bestCandidate.rssi); + return; + } // 区域切换:先离开当前区域,再进入新区域 publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(), - "AREA_SWITCH", currentArea.getEnterTime()); + "AREA_SWITCH", currentArea.getEnterTime(), eventTime); windowRedisDAO.clearWindow(deviceId, currentAreaId); long now = System.currentTimeMillis(); stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac); windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId); - publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi); + publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi, eventTime); log.info("[Trajectory] 区域切换:deviceId={}, from={}, to={}", deviceId, currentAreaId, bestCandidate.areaId); } @@ -189,13 +233,14 @@ public class TrajectoryDetectionProcessor { */ private void processWithoutCurrentArea(Long deviceId, Map matchedBeacons, - Map areaConfigIndex) { + Map areaConfigIndex, + LocalDateTime eventTime) { MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, null); if (bestCandidate != null) { long now = System.currentTimeMillis(); stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac); windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId); - publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi); + publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi, eventTime); log.info("[Trajectory] 进入区域:deviceId={}, areaId={}, rssi={}", deviceId, bestCandidate.areaId, bestCandidate.rssi); } @@ -282,6 +327,31 @@ public class TrajectoryDetectionProcessor { } } + /** + * 解析当前区域用于滞回判断的参考 RSSI: + * 1) 本次上报匹配到了当前区域 → 直接用本次 RSSI + * 2) 未匹配到 → 取窗口里最近一次非 -999(缺失哨兵)样本 + * 3) 仍取不到 → 返回 -999(此时滞回天然放行,等价于允许切换, + * 因为当前区域已彻底失去信号) + */ + private int resolveCurrentAreaRssi(Long deviceId, Long currentAreaId, + Map matchedBeacons) { + MatchedBeacon matched = matchedBeacons.get(currentAreaId); + if (matched != null) { + return matched.rssi; + } + List window = windowRedisDAO.getWindow(deviceId, currentAreaId); + if (window != null) { + for (int i = window.size() - 1; i >= 0; i--) { + Integer sample = window.get(i); + if (sample != null && sample != -999) { + return sample; + } + } + } + return -999; + } + /** * 找到信号最强且满足进入条件的候选区域 * @@ -366,7 +436,8 @@ public class TrajectoryDetectionProcessor { // ==================== 事件发布 ==================== - private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi) { + private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi, + LocalDateTime eventTime) { try { IotDeviceDO device = deviceService.getDeviceFromCache(deviceId); TrajectoryEnterEvent event = TrajectoryEnterEvent.builder() @@ -376,6 +447,7 @@ public class TrajectoryDetectionProcessor { .areaId(areaId) .beaconMac(beaconMac) .enterRssi(enterRssi) + .eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString()) .tenantId(TenantContextHolder.getTenantId()) .build(); @@ -390,7 +462,7 @@ public class TrajectoryDetectionProcessor { } private void publishLeaveEvent(Long deviceId, Long areaId, String beaconMac, - String leaveReason, Long enterTimestamp) { + String leaveReason, Long enterTimestamp, LocalDateTime eventTime) { try { IotDeviceDO device = deviceService.getDeviceFromCache(deviceId); TrajectoryLeaveEvent event = TrajectoryLeaveEvent.builder() @@ -401,6 +473,7 @@ public class TrajectoryDetectionProcessor { .beaconMac(beaconMac) .leaveReason(leaveReason) .enterTimestamp(enterTimestamp) + .eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString()) .tenantId(TenantContextHolder.getTenantId()) .build();