Files
security-ai-edge/test_garbage_algorithm.py
16337 a891deba00 新增:垃圾检测算法 GarbageDetectionAlgorithm v1.0
Edge 端实现:
- algorithms.py 新增 GarbageDetectionAlgorithm 类
  状态机:IDLE → CONFIRMING_GARBAGE → ALARMED → CONFIRMING_CLEAR → IDLE
  默认参数:confirm_garbage_sec=60, confirm_clear_sec=60, cooldown_sec=1800
  target_classes=['garbage'], alarm_level=2(普通)
  与 IllegalParking 同构但去掉 PARKED_COUNTDOWN 阶段
- AlgorithmManager 6 处集成:
  _PARAM_TYPES、default_params、load_bind_from_redis(热更新)、
  update_algorithm_params、register_algorithm、get_algorithm_status

测试:test_garbage_algorithm.py 覆盖 8 个场景,全部通过
- 无垃圾保持 IDLE
- 持续 60s 有垃圾 → 触发告警
- 冷却期内不重复触发
- 清理后发 resolve → IDLE
- 清理期内垃圾再出现 → 回 ALARMED
- reset() 清空状态
- 多目标计数
- 非 target_class 忽略

WVP 后端/前端改动方案预留在 docs/garbage_algorithm_backend_frontend_plan.md
(后续 ROI 绑定时再实施,本次只改 Edge 端)
2026-04-17 14:57:19 +08:00

315 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
GarbageDetectionAlgorithm 单元测试
覆盖场景:
1. 无垃圾时保持 IDLE
2. 持续检测到垃圾 → 确认 → 告警
3. 冷却期内不重复触发
4. 清理后发 resolve → 回到 IDLE
5. 清理确认期内垃圾再次出现 → 回到 ALARMED
6. reset() 正确清理状态
"""
import sys
import os
import logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from datetime import datetime, timedelta
from algorithms import GarbageDetectionAlgorithm
# ===== 工具函数 =====
def make_tracks(roi_id: str, classes: list, confidences: list = None):
"""生成模拟检测结果"""
if confidences is None:
confidences = [0.85] * len(classes)
tracks = []
for i, cls in enumerate(classes):
tracks.append({
"track_id": f"{roi_id}_{i}",
"class": cls,
"confidence": confidences[i],
"bbox": [100 + i * 50, 100, 200 + i * 50, 300],
"matched_rois": [{"roi_id": roi_id}],
})
return tracks
def simulate(algo, roi_id, camera_id, get_tracks_fn, count, interval=1.0, start_time=None):
"""连续模拟帧,返回所有 alerts 和最后时间戳"""
t = start_time or datetime(2026, 4, 17, 10, 0, 0)
all_alerts = []
for i in range(count):
tracks = get_tracks_fn(i)
alerts = algo.process(roi_id, camera_id, tracks, t)
if alerts:
all_alerts.extend(alerts)
t += timedelta(seconds=interval)
return all_alerts, t
# ===== 测试 1无垃圾时保持 IDLE =====
def test_idle_when_no_garbage():
print("\n" + "=" * 60)
print("TEST 1: 无垃圾帧始终保持 IDLE")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=60)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["person"]), # 只有人,没有垃圾
count=100,
)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
assert len(alerts) == 0, f"Expected no alerts, got {len(alerts)}"
print(f" 状态: {algo.state}alerts: {len(alerts)} [OK]")
# ===== 测试 2持续检测到垃圾 → 告警 =====
def test_garbage_triggers_alarm():
print("\n" + "=" * 60)
print("TEST 2: 持续 65 秒检测到垃圾 → 告警")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=60,
cooldown_sec=1800,
)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage"]),
count=65, # 65 秒,超过 60 秒确认期
)
# 应该在第 60-61 秒触发 1 个告警
assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}"
assert len(alerts) == 1, f"Expected 1 alert, got {len(alerts)}"
alert = alerts[0]
assert alert["alert_type"] == "garbage"
assert alert["alarm_level"] == 2
assert alert["garbage_count"] == 1
assert "检测到垃圾" in alert["message"]
print(f" 状态: {algo.state},告警: {alert['message']} [OK]")
# ===== 测试 3冷却期内不重复触发 =====
def test_cooldown_prevents_duplicate():
print("\n" + "=" * 60)
print("TEST 3: 告警后冷却期内持续有垃圾,不重复触发")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10, # 缩短便于测试
confirm_clear_sec=10,
cooldown_sec=300, # 5 分钟冷却
)
# 持续 200 秒有垃圾(远超冷却时间但没超过 300 秒)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage"]),
count=200,
)
assert len(alerts) == 1, f"Expected 1 alert (cooldown), got {len(alerts)}"
assert algo.state == "ALARMED"
print(f" 告警次数: {len(alerts)}(冷却期内不重复)[OK]")
# ===== 测试 4清理后发 resolve → IDLE =====
def test_resolve_after_cleaning():
print("\n" + "=" * 60)
print("TEST 4: 告警后清理 → 发 resolve → IDLE")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10,
confirm_clear_sec=10,
cooldown_sec=300,
)
algo._last_alarm_id = "test_alarm_123" # 模拟 main.py 回填
t = datetime(2026, 4, 17, 10, 0, 0)
all_alerts = []
# Phase 1: 15 秒有垃圾 → 触发告警
for i in range(15):
alerts = algo.process(
"roi_1", "cam_1",
make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i)
)
all_alerts.extend(alerts)
assert algo.state == "ALARMED"
# Phase 2: 然后 30 秒无垃圾 → 发 resolve
# 需要等滑动窗口(10s)清空 + confirm_clear_sec(10s) = 20+ 秒
for i in range(15, 45):
alerts = algo.process(
"roi_1", "cam_1",
make_tracks("roi_1", []), # 空
t + timedelta(seconds=i)
)
all_alerts.extend(alerts)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
resolves = [a for a in all_alerts if a.get("alert_type") == "alarm_resolve"]
assert len(resolves) == 1, f"Expected 1 resolve, got {len(resolves)}"
resolve = resolves[0]
assert resolve["resolve_alarm_id"] == "test_alarm_123"
assert resolve["resolve_type"] == "garbage_removed"
assert resolve["duration_ms"] > 0
print(f" resolve: {resolve['resolve_type']}, 持续 {resolve['duration_ms']}ms [OK]")
# ===== 测试 5清理期内垃圾再出现 → 回到 ALARMED =====
def test_garbage_reappears_during_clearing():
print("\n" + "=" * 60)
print("TEST 5: 清理确认期内垃圾再出现 → 回到 ALARMED")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10,
confirm_clear_sec=20, # 较长的清理确认期
cooldown_sec=300,
)
algo._last_alarm_id = "test_alarm_456"
t = datetime(2026, 4, 17, 10, 0, 0)
# Phase 1: 15 秒有垃圾 → 告警 → ALARMED
for i in range(15):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED"
# Phase 2: 5 秒无垃圾 → CONFIRMING_CLEAR
for i in range(15, 25):
algo.process("roi_1", "cam_1", make_tracks("roi_1", []),
t + timedelta(seconds=i))
assert algo.state == "CONFIRMING_CLEAR", f"got {algo.state}"
# Phase 3: 垃圾又出现 5 秒 → 回到 ALARMED
for i in range(25, 40):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}"
print(f" 状态恢复: CONFIRMING_CLEAR → ALARMED [OK]")
# ===== 测试 6reset() 清理状态 =====
def test_reset_clears_state():
print("\n" + "=" * 60)
print("TEST 6: reset() 正确清理所有状态")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=5)
algo._last_alarm_id = "test"
# 先让它进入某个状态
t = datetime(2026, 4, 17, 10, 0, 0)
for i in range(10):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED"
assert len(algo._detection_window) > 0
assert len(algo.alert_cooldowns) > 0
# Reset
algo.reset()
assert algo.state == "IDLE"
assert algo.state_start_time is None
assert algo._last_alarm_id is None
assert algo._garbage_start_time is None
assert len(algo._detection_window) == 0
assert len(algo.alert_cooldowns) == 0
print(" 所有状态已清空 [OK]")
# ===== 测试 7多个垃圾目标计数 =====
def test_multiple_garbage_count():
print("\n" + "=" * 60)
print("TEST 7: ROI 内多个垃圾目标 → garbage_count 正确")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=5)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage", "garbage", "garbage"]),
count=10,
)
assert len(alerts) == 1
assert alerts[0]["garbage_count"] == 3
print(f" garbage_count: {alerts[0]['garbage_count']} [OK]")
# ===== 测试 8非 target_class 不计入 =====
def test_non_target_class_ignored():
print("\n" + "=" * 60)
print("TEST 8: person/car 类不计入(只看 garbage")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=10)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["person", "car"]), # 都不是 garbage
count=30,
)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
assert len(alerts) == 0
print(f" 状态: {algo.state},无告警 [OK]")
# ===== 运行所有测试 =====
if __name__ == "__main__":
tests = [
test_idle_when_no_garbage,
test_garbage_triggers_alarm,
test_cooldown_prevents_duplicate,
test_resolve_after_cleaning,
test_garbage_reappears_during_clearing,
test_reset_clears_state,
test_multiple_garbage_count,
test_non_target_class_ignored,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except AssertionError as e:
print(f" FAIL: {e}")
failed += 1
except Exception as e:
print(f" ERROR: {e}")
import traceback
traceback.print_exc()
failed += 1
print("\n" + "=" * 60)
print(f"结果: {passed} 通过, {failed} 失败")
print("=" * 60)
sys.exit(0 if failed == 0 else 1)