Files
security-ai-edge/test_garbage_algorithm.py

315 lines
9.6 KiB
Python
Raw Normal View History

"""
GarbageDetectionAlgorithm 单元测试
覆盖场景
1. 无垃圾时保持 IDLE
2. 持续检测到垃圾 确认 告警
3. 冷却期内不重复触发
4. 清理后发 resolve 回到 IDLE
5. 清理确认期内垃圾再次出现 回到 ALARMED
6. reset() 正确清理状态
"""
import sys
import os
import logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from datetime import datetime, timedelta
from algorithms import GarbageDetectionAlgorithm
# ===== 工具函数 =====
def make_tracks(roi_id: str, classes: list, confidences: list = None):
"""生成模拟检测结果"""
if confidences is None:
confidences = [0.85] * len(classes)
tracks = []
for i, cls in enumerate(classes):
tracks.append({
"track_id": f"{roi_id}_{i}",
"class": cls,
"confidence": confidences[i],
"bbox": [100 + i * 50, 100, 200 + i * 50, 300],
"matched_rois": [{"roi_id": roi_id}],
})
return tracks
def simulate(algo, roi_id, camera_id, get_tracks_fn, count, interval=1.0, start_time=None):
"""连续模拟帧,返回所有 alerts 和最后时间戳"""
t = start_time or datetime(2026, 4, 17, 10, 0, 0)
all_alerts = []
for i in range(count):
tracks = get_tracks_fn(i)
alerts = algo.process(roi_id, camera_id, tracks, t)
if alerts:
all_alerts.extend(alerts)
t += timedelta(seconds=interval)
return all_alerts, t
# ===== 测试 1无垃圾时保持 IDLE =====
def test_idle_when_no_garbage():
print("\n" + "=" * 60)
print("TEST 1: 无垃圾帧始终保持 IDLE")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=60)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["person"]), # 只有人,没有垃圾
count=100,
)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
assert len(alerts) == 0, f"Expected no alerts, got {len(alerts)}"
print(f" 状态: {algo.state}alerts: {len(alerts)} [OK]")
# ===== 测试 2持续检测到垃圾 → 告警 =====
def test_garbage_triggers_alarm():
print("\n" + "=" * 60)
print("TEST 2: 持续 65 秒检测到垃圾 → 告警")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=60,
cooldown_sec=1800,
)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage"]),
count=65, # 65 秒,超过 60 秒确认期
)
# 应该在第 60-61 秒触发 1 个告警
assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}"
assert len(alerts) == 1, f"Expected 1 alert, got {len(alerts)}"
alert = alerts[0]
assert alert["alert_type"] == "garbage"
assert alert["alarm_level"] == 2
assert alert["garbage_count"] == 1
assert "检测到垃圾" in alert["message"]
print(f" 状态: {algo.state},告警: {alert['message']} [OK]")
# ===== 测试 3冷却期内不重复触发 =====
def test_cooldown_prevents_duplicate():
print("\n" + "=" * 60)
print("TEST 3: 告警后冷却期内持续有垃圾,不重复触发")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10, # 缩短便于测试
confirm_clear_sec=10,
cooldown_sec=300, # 5 分钟冷却
)
# 持续 200 秒有垃圾(远超冷却时间但没超过 300 秒)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage"]),
count=200,
)
assert len(alerts) == 1, f"Expected 1 alert (cooldown), got {len(alerts)}"
assert algo.state == "ALARMED"
print(f" 告警次数: {len(alerts)}(冷却期内不重复)[OK]")
# ===== 测试 4清理后发 resolve → IDLE =====
def test_resolve_after_cleaning():
print("\n" + "=" * 60)
print("TEST 4: 告警后清理 → 发 resolve → IDLE")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10,
confirm_clear_sec=10,
cooldown_sec=300,
)
algo._last_alarm_id = "test_alarm_123" # 模拟 main.py 回填
t = datetime(2026, 4, 17, 10, 0, 0)
all_alerts = []
# Phase 1: 15 秒有垃圾 → 触发告警
for i in range(15):
alerts = algo.process(
"roi_1", "cam_1",
make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i)
)
all_alerts.extend(alerts)
assert algo.state == "ALARMED"
# Phase 2: 然后 30 秒无垃圾 → 发 resolve
# 需要等滑动窗口(10s)清空 + confirm_clear_sec(10s) = 20+ 秒
for i in range(15, 45):
alerts = algo.process(
"roi_1", "cam_1",
make_tracks("roi_1", []), # 空
t + timedelta(seconds=i)
)
all_alerts.extend(alerts)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
resolves = [a for a in all_alerts if a.get("alert_type") == "alarm_resolve"]
assert len(resolves) == 1, f"Expected 1 resolve, got {len(resolves)}"
resolve = resolves[0]
assert resolve["resolve_alarm_id"] == "test_alarm_123"
assert resolve["resolve_type"] == "garbage_removed"
assert resolve["duration_ms"] > 0
print(f" resolve: {resolve['resolve_type']}, 持续 {resolve['duration_ms']}ms [OK]")
# ===== 测试 5清理期内垃圾再出现 → 回到 ALARMED =====
def test_garbage_reappears_during_clearing():
print("\n" + "=" * 60)
print("TEST 5: 清理确认期内垃圾再出现 → 回到 ALARMED")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10,
confirm_clear_sec=20, # 较长的清理确认期
cooldown_sec=300,
)
algo._last_alarm_id = "test_alarm_456"
t = datetime(2026, 4, 17, 10, 0, 0)
# Phase 1: 15 秒有垃圾 → 告警 → ALARMED
for i in range(15):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED"
# Phase 2: 5 秒无垃圾 → CONFIRMING_CLEAR
for i in range(15, 25):
algo.process("roi_1", "cam_1", make_tracks("roi_1", []),
t + timedelta(seconds=i))
assert algo.state == "CONFIRMING_CLEAR", f"got {algo.state}"
# Phase 3: 垃圾又出现 5 秒 → 回到 ALARMED
for i in range(25, 40):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}"
print(f" 状态恢复: CONFIRMING_CLEAR → ALARMED [OK]")
# ===== 测试 6reset() 清理状态 =====
def test_reset_clears_state():
print("\n" + "=" * 60)
print("TEST 6: reset() 正确清理所有状态")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=5)
algo._last_alarm_id = "test"
# 先让它进入某个状态
t = datetime(2026, 4, 17, 10, 0, 0)
for i in range(10):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED"
assert len(algo._detection_window) > 0
assert len(algo.alert_cooldowns) > 0
# Reset
algo.reset()
assert algo.state == "IDLE"
assert algo.state_start_time is None
assert algo._last_alarm_id is None
assert algo._garbage_start_time is None
assert len(algo._detection_window) == 0
assert len(algo.alert_cooldowns) == 0
print(" 所有状态已清空 [OK]")
# ===== 测试 7多个垃圾目标计数 =====
def test_multiple_garbage_count():
print("\n" + "=" * 60)
print("TEST 7: ROI 内多个垃圾目标 → garbage_count 正确")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=5)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage", "garbage", "garbage"]),
count=10,
)
assert len(alerts) == 1
assert alerts[0]["garbage_count"] == 3
print(f" garbage_count: {alerts[0]['garbage_count']} [OK]")
# ===== 测试 8非 target_class 不计入 =====
def test_non_target_class_ignored():
print("\n" + "=" * 60)
print("TEST 8: person/car 类不计入(只看 garbage")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=10)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["person", "car"]), # 都不是 garbage
count=30,
)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
assert len(alerts) == 0
print(f" 状态: {algo.state},无告警 [OK]")
# ===== 运行所有测试 =====
if __name__ == "__main__":
tests = [
test_idle_when_no_garbage,
test_garbage_triggers_alarm,
test_cooldown_prevents_duplicate,
test_resolve_after_cleaning,
test_garbage_reappears_during_clearing,
test_reset_clears_state,
test_multiple_garbage_count,
test_non_target_class_ignored,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except AssertionError as e:
print(f" FAIL: {e}")
failed += 1
except Exception as e:
print(f" ERROR: {e}")
import traceback
traceback.print_exc()
failed += 1
print("\n" + "=" * 60)
print(f"结果: {passed} 通过, {failed} 失败")
print("=" * 60)
sys.exit(0 if failed == 0 else 1)