import sys sys.path.insert(0, 'C:/Users/16337/PycharmProjects/ai_edge') from algorithms import LeavePostAlgorithm from datetime import datetime, timedelta def test_alarm_trigger_no_duration(): """测试告警触发时不包含持续时长""" algo = LeavePostAlgorithm( confirm_on_duty_sec=1, confirm_off_duty_sec=1, leave_countdown_sec=2, ) # 滑动窗口大小是10秒,我们需要在整个流程中考虑这一点 t0 = datetime(2026, 2, 12, 14, 0, 0) person_track = [{"matched_rois": [{"roi_id": "roi1"}], "class": "person"}] # 1. 上岗阶段:持续检测到人 (2秒,超过confirm_on_duty_sec=1秒) for i in range(20): # 每100ms一帧,共2秒 t = t0 + timedelta(milliseconds=i*100) algo.process("roi1", "cam1", person_track, t) print(f"[1] 上岗后状态: {algo.state}") assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}" # 2. 离开阶段:等待11秒让窗口内的人检测完全过期,然后再发空帧 # 关键:需要等待窗口过期(10秒) + 确认时间(1秒) t_leave_start = t0 + timedelta(seconds=2) # 发送空帧超过窗口大小,确保detection_ratio变为0 for i in range(120): # 每100ms一帧,共12秒 t = t_leave_start + timedelta(milliseconds=i*100) algo.process("roi1", "cam1", [], t) print(f"[2] 离开12秒后状态: {algo.state}") # 3. 继续发送空帧,等待倒计时结束 (leave_countdown_sec=2秒) t_wait_alarm = t_leave_start + timedelta(seconds=12) for i in range(30): # 每100ms一帧,共3秒 (超过倒计时) t = t_wait_alarm + timedelta(milliseconds=i*100) alerts = algo.process("roi1", "cam1", [], t) if alerts: break print(f"[3] 告警触发后状态: {algo.state}") print(f" 告警数量: {len(alerts)}") print(f" 告警内容: {alerts}") # ✅ 验证:告警不包含 duration_minutes assert len(alerts) == 1, f"Expected 1 alert, got {len(alerts)}" alarm = alerts[0] assert alarm["alert_type"] == "leave_post" assert "duration_minutes" not in alarm, "告警不应包含 duration_minutes" assert "first_frame_time" in alarm, "告警应包含 first_frame_time" assert alarm["message"] == "人员离岗告警", f"消息错误: {alarm['message']}" print(f"测试通过:告警不含持续时长") print(f" first_frame_time: {alarm['first_frame_time']}") if __name__ == "__main__": test_alarm_trigger_no_duration()