diff --git a/test_leave_post_full_workflow.py b/test_leave_post_full_workflow.py new file mode 100644 index 0000000..6c6dd77 --- /dev/null +++ b/test_leave_post_full_workflow.py @@ -0,0 +1,266 @@ +""" +离岗告警完整流程集成测试 + +测试完整业务流程: +1. 人员上岗确认 +2. 人员离开ROI +3. 离岗确认 → OFF_DUTY_COUNTDOWN +4. 倒计时结束 → ALARMED (验证告警无 duration_minutes,有 first_frame_time) +5. 创建告警记录 (验证 duration_ms=None) +6. 人员返回 → CONFIRMING_ON_DUTY +7. 回岗确认 → ON_DUTY (验证发送 resolve 事件) +8. 更新告警记录 (验证 duration_ms 已填充) +""" +import sys +sys.path.insert(0, 'C:/Users/16337/PycharmProjects/ai_edge') + +from algorithms import LeavePostAlgorithm +from datetime import datetime, timedelta + + +def test_leave_post_full_workflow(): + """测试离岗告警完整工作流""" + print("=" * 80) + print("Leave Post Alarm Full Workflow Integration Test") + print("=" * 80) + + algo = LeavePostAlgorithm( + confirm_on_duty_sec=2, # 2秒上岗确认 + confirm_off_duty_sec=3, # 3秒离岗确认 + leave_countdown_sec=5, # 5秒离岗倒计时 + confirm_return_sec=2, # 2秒回岗确认 + ) + + roi_id = "test_roi_001" + camera_id = "test_camera_001" + person_track = [{"matched_rois": [{"roi_id": roi_id}], "class": "person", "bbox": [100, 100, 200, 200]}] + + # 测试起始时间 + t0 = datetime(2026, 2, 13, 14, 0, 0) + + # 存储告警数据(模拟数据库) + alarm_db = {} + + try: + print("\n" + "=" * 80) + print("Stage 1: Person On Duty Confirmation") + print("=" * 80) + print("Detecting person for 3 seconds (exceeds confirm_on_duty_sec=2s)") + + for i in range(30): # 每100ms一帧,共3秒 + t = t0 + timedelta(milliseconds=i*100) + alerts = algo.process(roi_id, camera_id, person_track, t) + if alerts: + print(f" [!] 意外告警: {alerts}") + + print(f" State: {algo.state}") + assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}" + print(" [OK] Person on duty confirmed") + + print("\n" + "=" * 80) + print("Stage 2: Person Leaves ROI") + print("=" * 80) + print("Sending empty frames, waiting for window expiry (10s) + leave confirmation (3s)") + + t_leave = t0 + timedelta(seconds=3) + first_frame_time_expected = None + + # 发送空帧让检测窗口清空,然后进入离岗确认 + for i in range(140): # 每100ms一帧,共14秒 + t = t_leave + timedelta(milliseconds=i*100) + alerts = algo.process(roi_id, camera_id, [], t) + + if algo.state == "CONFIRMING_OFF_DUTY" and first_frame_time_expected is None: + print(f" State change detected: ON_DUTY -> CONFIRMING_OFF_DUTY (time: {t})") + + if algo.state == "OFF_DUTY_COUNTDOWN": + if first_frame_time_expected is None: + first_frame_time_expected = t + print(f" State change detected: CONFIRMING_OFF_DUTY -> OFF_DUTY_COUNTDOWN") + print(f" Leave confirmed, recording leave time: {first_frame_time_expected}") + break + + print(f" State: {algo.state}") + assert algo.state == "OFF_DUTY_COUNTDOWN", f"Expected OFF_DUTY_COUNTDOWN, got {algo.state}" + print(" [OK] Leave confirmed, countdown started") + + print("\n" + "=" * 80) + print("Stage 3: Leave Countdown") + print("=" * 80) + print("Continuing empty frames, waiting for countdown (5s)") + + t_countdown = t_leave + timedelta(seconds=14) + alarm_triggered = None + + for i in range(60): # 每100ms一帧,共6秒 (确保超过倒计时) + t = t_countdown + timedelta(milliseconds=i*100) + alerts = algo.process(roi_id, camera_id, [], t) + + if alerts: + for alert in alerts: + if alert.get("alert_type") == "leave_post": + alarm_triggered = alert + print(f" [!] Alarm triggered: {t}") + print(f" alert_type: {alert['alert_type']}") + print(f" message: {alert['message']}") + print(f" first_frame_time: {alert.get('first_frame_time')}") + print(f" duration_minutes: {alert.get('duration_minutes', 'NOT_PRESENT')}") + break + + if algo.state == "ALARMED": + print(f" State: {algo.state}") + break + + assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}" + assert alarm_triggered is not None, "Alarm should be triggered" + assert "duration_minutes" not in alarm_triggered, "Alarm should NOT contain duration_minutes" + assert "first_frame_time" in alarm_triggered, "Alarm should contain first_frame_time" + print(" [OK] Alarm triggered successfully, NO duration_minutes field") + + print("\n" + "=" * 80) + print("Stage 4: Create Alarm Record (Simulated DB)") + print("=" * 80) + # Simulate saving alarm to database + import uuid + alarm_id = f"ALM{datetime.now().strftime('%Y%m%d%H%M%S')}{uuid.uuid4().hex[:4].upper()}" + + # Parse first_frame_time + first_frame_time_str = alarm_triggered.get("first_frame_time") + first_frame_time_dt = datetime.strptime(first_frame_time_str, '%Y-%m-%d %H:%M:%S') + + # Simulated alarm record + alarm_db[alarm_id] = { + "alarm_id": alarm_id, + "alarm_type": "leave_post", + "device_id": camera_id, + "scene_id": roi_id, + "event_time": t, # Alarm trigger time + "first_frame_time": first_frame_time_dt, # Leave time + "last_frame_time": None, # Not returned yet + "duration_ms": None, # Duration unknown + "alarm_status": "NEW", + "handle_status": "UNHANDLED", + } + + # Set alarm_id in algorithm + algo.set_last_alarm_id(alarm_id) + + alarm = alarm_db[alarm_id] + print(f" Alarm ID: {alarm_id}") + print(f" event_time: {alarm['event_time']}") + print(f" first_frame_time: {alarm['first_frame_time']}") + print(f" last_frame_time: {alarm['last_frame_time']} (NULL)") + print(f" duration_ms: {alarm['duration_ms']} (NULL)") + assert alarm["duration_ms"] is None, "duration_ms should be None" + assert alarm["last_frame_time"] is None, "last_frame_time should be None" + print(" [OK] Alarm created, duration_ms=NULL, last_frame_time=NULL") + + print("\n" + "=" * 80) + print("Stage 5: Person Returns to ROI") + print("=" * 80) + print("Sending person detection frames, waiting for return confirmation (2s)") + + t_return = t_countdown + timedelta(seconds=7) # Returned 1s after alarm + + # Send enough person frames to fill detection window and exceed confirm_return_sec + for i in range(130): # 100ms per frame, 13s total (way more than needed) + t = t_return + timedelta(milliseconds=i*100) + alerts = algo.process(roi_id, camera_id, person_track, t) + + # Check for resolve event + if alerts: + for alert in alerts: + if alert.get("alert_type") == "alarm_resolve": + resolve_event = alert + print(f" [!] Resolve event triggered: {t}") + print(f" alert_type: {alert['alert_type']}") + print(f" resolve_alarm_id: {alert['resolve_alarm_id']}") + print(f" duration_ms: {alert['duration_ms']}") + print(f" last_frame_time: {alert['last_frame_time']}") + print(f" resolve_type: {alert['resolve_type']}") + break + + if algo.state == "CONFIRMING_ON_DUTY" and i < 10: + if i == 0: + print(f" State change detected: ALARMED -> CONFIRMING_ON_DUTY (time: {t})") + + if algo.state == "ON_DUTY": + print(f" State change detected: CONFIRMING_ON_DUTY -> ON_DUTY (time: {t})") + print(f" State: {algo.state}") + break + + assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}" + assert resolve_event is not None, "Resolve event should be triggered" + assert resolve_event["resolve_alarm_id"] == alarm_id, "resolve_alarm_id mismatch" + assert resolve_event["duration_ms"] > 0, "duration_ms should be > 0" + assert resolve_event["resolve_type"] == "person_returned", "resolve_type should be person_returned" + print(" [OK] Return confirmed, resolve event sent") + + print("\n" + "=" * 80) + print("Stage 7: Update Alarm Record (Simulated resolve_alarm)") + print("=" * 80) + + # Simulate resolve_alarm update + alarm["duration_ms"] = resolve_event["duration_ms"] + alarm["last_frame_time"] = datetime.strptime(resolve_event["last_frame_time"], '%Y-%m-%d %H:%M:%S') + alarm["alarm_status"] = "CLOSED" + alarm["handle_status"] = "DONE" + alarm["handle_remark"] = "Person returned - auto closed" + + print(f" Alarm ID: {alarm_id}") + print(f" duration_ms: {alarm['duration_ms']} ({alarm['duration_ms']/1000:.1f}s)") + print(f" last_frame_time: {alarm['last_frame_time']}") + print(f" alarm_status: {alarm['alarm_status']}") + print(f" handle_status: {alarm['handle_status']}") + print(f" handle_remark: {alarm['handle_remark']}") + + assert alarm["duration_ms"] is not None, "duration_ms should be filled" + assert alarm["duration_ms"] > 0, "duration_ms should be > 0" + assert alarm["last_frame_time"] is not None, "last_frame_time should be filled" + assert alarm["alarm_status"] == "CLOSED", f"alarm_status should be CLOSED, got {alarm['alarm_status']}" + assert alarm["handle_status"] == "DONE", f"handle_status should be DONE, got {alarm['handle_status']}" + print(" [OK] Alarm record updated, duration_ms filled, status closed") + + print("\n" + "=" * 80) + print("Stage 8: Verify Complete Workflow") + print("=" * 80) + + # Verify duration calculation + expected_duration_ms = int((datetime.strptime(resolve_event["last_frame_time"], '%Y-%m-%d %H:%M:%S') - first_frame_time_dt).total_seconds() * 1000) + actual_duration_ms = alarm["duration_ms"] + + print(f" Expected duration_ms: {expected_duration_ms}ms") + print(f" Actual duration_ms: {actual_duration_ms}ms") + print(f" Difference: {abs(expected_duration_ms - actual_duration_ms)}ms") + + # Allow 1 second error (1000ms) + assert abs(expected_duration_ms - actual_duration_ms) <= 1000, "duration_ms calculation error too large" + + print(" [OK] duration_ms calculation correct") + + print("\n" + "=" * 80) + print("SUCCESS: Full workflow test passed") + print("=" * 80) + print("\nWorkflow Summary:") + print(f" 1. On duty time: {t0}") + print(f" 2. Leave time: {first_frame_time_dt}") + print(f" 3. Alarm time: {alarm['event_time']}") + print(f" 4. Return time: {alarm['last_frame_time']}") + print(f" 5. Leave duration: {alarm['duration_ms']/1000:.1f}s ({alarm['duration_ms']/60000:.1f}min)") + print(f" 6. Alarm status: {alarm['alarm_status']}") + print(f" 7. Handle status: {alarm['handle_status']}") + + print("\n [OK] Test completed successfully") + + except Exception as e: + print(f"\n" + "=" * 80) + print(f"FAILED: Test failed") + print("=" * 80) + print(f"Error: {e}") + import traceback + traceback.print_exc() + raise + + +if __name__ == "__main__": + test_leave_post_full_workflow()