63 lines
2.5 KiB
Python
63 lines
2.5 KiB
Python
|
|
import sys
|
|||
|
|
sys.path.insert(0, 'C:/Users/16337/PycharmProjects/ai_edge')
|
|||
|
|
|
|||
|
|
from algorithms import LeavePostAlgorithm
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
|
|||
|
|
def test_alarm_trigger_no_duration():
|
|||
|
|
"""测试告警触发时不包含持续时长"""
|
|||
|
|
algo = LeavePostAlgorithm(
|
|||
|
|
confirm_on_duty_sec=1,
|
|||
|
|
confirm_off_duty_sec=1,
|
|||
|
|
leave_countdown_sec=2,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 滑动窗口大小是10秒,我们需要在整个流程中考虑这一点
|
|||
|
|
t0 = datetime(2026, 2, 12, 14, 0, 0)
|
|||
|
|
person_track = [{"matched_rois": [{"roi_id": "roi1"}], "class": "person"}]
|
|||
|
|
|
|||
|
|
# 1. 上岗阶段:持续检测到人 (2秒,超过confirm_on_duty_sec=1秒)
|
|||
|
|
for i in range(20): # 每100ms一帧,共2秒
|
|||
|
|
t = t0 + timedelta(milliseconds=i*100)
|
|||
|
|
algo.process("roi1", "cam1", person_track, t)
|
|||
|
|
|
|||
|
|
print(f"[1] 上岗后状态: {algo.state}")
|
|||
|
|
assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}"
|
|||
|
|
|
|||
|
|
# 2. 离开阶段:等待11秒让窗口内的人检测完全过期,然后再发空帧
|
|||
|
|
# 关键:需要等待窗口过期(10秒) + 确认时间(1秒)
|
|||
|
|
t_leave_start = t0 + timedelta(seconds=2)
|
|||
|
|
|
|||
|
|
# 发送空帧超过窗口大小,确保detection_ratio变为0
|
|||
|
|
for i in range(120): # 每100ms一帧,共12秒
|
|||
|
|
t = t_leave_start + timedelta(milliseconds=i*100)
|
|||
|
|
algo.process("roi1", "cam1", [], t)
|
|||
|
|
|
|||
|
|
print(f"[2] 离开12秒后状态: {algo.state}")
|
|||
|
|
|
|||
|
|
# 3. 继续发送空帧,等待倒计时结束 (leave_countdown_sec=2秒)
|
|||
|
|
t_wait_alarm = t_leave_start + timedelta(seconds=12)
|
|||
|
|
for i in range(30): # 每100ms一帧,共3秒 (超过倒计时)
|
|||
|
|
t = t_wait_alarm + timedelta(milliseconds=i*100)
|
|||
|
|
alerts = algo.process("roi1", "cam1", [], t)
|
|||
|
|
if alerts:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
print(f"[3] 告警触发后状态: {algo.state}")
|
|||
|
|
print(f" 告警数量: {len(alerts)}")
|
|||
|
|
print(f" 告警内容: {alerts}")
|
|||
|
|
|
|||
|
|
# ✅ 验证:告警不包含 duration_minutes
|
|||
|
|
assert len(alerts) == 1, f"Expected 1 alert, got {len(alerts)}"
|
|||
|
|
alarm = alerts[0]
|
|||
|
|
assert alarm["alert_type"] == "leave_post"
|
|||
|
|
assert "duration_minutes" not in alarm, "告警不应包含 duration_minutes"
|
|||
|
|
assert "first_frame_time" in alarm, "告警应包含 first_frame_time"
|
|||
|
|
assert alarm["message"] == "人员离岗告警", f"消息错误: {alarm['message']}"
|
|||
|
|
|
|||
|
|
print(f"测试通过:告警不含持续时长")
|
|||
|
|
print(f" first_frame_time: {alarm['first_frame_time']}")
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
test_alarm_trigger_no_duration()
|