Files
security-ai-edge/test_leave_post_full_workflow.py

267 lines
12 KiB
Python
Raw Normal View History

"""
离岗告警完整流程集成测试
测试完整业务流程
1. 人员上岗确认
2. 人员离开ROI
3. 离岗确认 OFF_DUTY_COUNTDOWN
4. 倒计时结束 ALARMED (验证告警无 duration_minutes first_frame_time)
5. 创建告警记录 (验证 duration_ms=None)
6. 人员返回 CONFIRMING_ON_DUTY
7. 回岗确认 ON_DUTY (验证发送 resolve 事件)
8. 更新告警记录 (验证 duration_ms 已填充)
"""
import sys
sys.path.insert(0, 'C:/Users/16337/PycharmProjects/ai_edge')
from algorithms import LeavePostAlgorithm
from datetime import datetime, timedelta
def test_leave_post_full_workflow():
"""测试离岗告警完整工作流"""
print("=" * 80)
print("Leave Post Alarm Full Workflow Integration Test")
print("=" * 80)
algo = LeavePostAlgorithm(
confirm_on_duty_sec=2, # 2秒上岗确认
confirm_off_duty_sec=3, # 3秒离岗确认
leave_countdown_sec=5, # 5秒离岗倒计时
confirm_return_sec=2, # 2秒回岗确认
)
roi_id = "test_roi_001"
camera_id = "test_camera_001"
person_track = [{"matched_rois": [{"roi_id": roi_id}], "class": "person", "bbox": [100, 100, 200, 200]}]
# 测试起始时间
t0 = datetime(2026, 2, 13, 14, 0, 0)
# 存储告警数据(模拟数据库)
alarm_db = {}
try:
print("\n" + "=" * 80)
print("Stage 1: Person On Duty Confirmation")
print("=" * 80)
print("Detecting person for 3 seconds (exceeds confirm_on_duty_sec=2s)")
for i in range(30): # 每100ms一帧共3秒
t = t0 + timedelta(milliseconds=i*100)
alerts = algo.process(roi_id, camera_id, person_track, t)
if alerts:
print(f" [!] 意外告警: {alerts}")
print(f" State: {algo.state}")
assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}"
print(" [OK] Person on duty confirmed")
print("\n" + "=" * 80)
print("Stage 2: Person Leaves ROI")
print("=" * 80)
print("Sending empty frames, waiting for window expiry (10s) + leave confirmation (3s)")
t_leave = t0 + timedelta(seconds=3)
first_frame_time_expected = None
# 发送空帧让检测窗口清空,然后进入离岗确认
for i in range(140): # 每100ms一帧共14秒
t = t_leave + timedelta(milliseconds=i*100)
alerts = algo.process(roi_id, camera_id, [], t)
if algo.state == "CONFIRMING_OFF_DUTY" and first_frame_time_expected is None:
print(f" State change detected: ON_DUTY -> CONFIRMING_OFF_DUTY (time: {t})")
if algo.state == "OFF_DUTY_COUNTDOWN":
if first_frame_time_expected is None:
first_frame_time_expected = t
print(f" State change detected: CONFIRMING_OFF_DUTY -> OFF_DUTY_COUNTDOWN")
print(f" Leave confirmed, recording leave time: {first_frame_time_expected}")
break
print(f" State: {algo.state}")
assert algo.state == "OFF_DUTY_COUNTDOWN", f"Expected OFF_DUTY_COUNTDOWN, got {algo.state}"
print(" [OK] Leave confirmed, countdown started")
print("\n" + "=" * 80)
print("Stage 3: Leave Countdown")
print("=" * 80)
print("Continuing empty frames, waiting for countdown (5s)")
t_countdown = t_leave + timedelta(seconds=14)
alarm_triggered = None
for i in range(60): # 每100ms一帧共6秒 (确保超过倒计时)
t = t_countdown + timedelta(milliseconds=i*100)
alerts = algo.process(roi_id, camera_id, [], t)
if alerts:
for alert in alerts:
if alert.get("alert_type") == "leave_post":
alarm_triggered = alert
print(f" [!] Alarm triggered: {t}")
print(f" alert_type: {alert['alert_type']}")
print(f" message: {alert['message']}")
print(f" first_frame_time: {alert.get('first_frame_time')}")
print(f" duration_minutes: {alert.get('duration_minutes', 'NOT_PRESENT')}")
break
if algo.state == "ALARMED":
print(f" State: {algo.state}")
break
assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}"
assert alarm_triggered is not None, "Alarm should be triggered"
assert "duration_minutes" not in alarm_triggered, "Alarm should NOT contain duration_minutes"
assert "first_frame_time" in alarm_triggered, "Alarm should contain first_frame_time"
print(" [OK] Alarm triggered successfully, NO duration_minutes field")
print("\n" + "=" * 80)
print("Stage 4: Create Alarm Record (Simulated DB)")
print("=" * 80)
# Simulate saving alarm to database
import uuid
alarm_id = f"ALM{datetime.now().strftime('%Y%m%d%H%M%S')}{uuid.uuid4().hex[:4].upper()}"
# Parse first_frame_time
first_frame_time_str = alarm_triggered.get("first_frame_time")
first_frame_time_dt = datetime.strptime(first_frame_time_str, '%Y-%m-%d %H:%M:%S')
# Simulated alarm record
alarm_db[alarm_id] = {
"alarm_id": alarm_id,
"alarm_type": "leave_post",
"device_id": camera_id,
"scene_id": roi_id,
"event_time": t, # Alarm trigger time
"first_frame_time": first_frame_time_dt, # Leave time
"last_frame_time": None, # Not returned yet
"duration_ms": None, # Duration unknown
"alarm_status": "NEW",
"handle_status": "UNHANDLED",
}
# Set alarm_id in algorithm
algo.set_last_alarm_id(alarm_id)
alarm = alarm_db[alarm_id]
print(f" Alarm ID: {alarm_id}")
print(f" event_time: {alarm['event_time']}")
print(f" first_frame_time: {alarm['first_frame_time']}")
print(f" last_frame_time: {alarm['last_frame_time']} (NULL)")
print(f" duration_ms: {alarm['duration_ms']} (NULL)")
assert alarm["duration_ms"] is None, "duration_ms should be None"
assert alarm["last_frame_time"] is None, "last_frame_time should be None"
print(" [OK] Alarm created, duration_ms=NULL, last_frame_time=NULL")
print("\n" + "=" * 80)
print("Stage 5: Person Returns to ROI")
print("=" * 80)
print("Sending person detection frames, waiting for return confirmation (2s)")
t_return = t_countdown + timedelta(seconds=7) # Returned 1s after alarm
# Send enough person frames to fill detection window and exceed confirm_return_sec
for i in range(130): # 100ms per frame, 13s total (way more than needed)
t = t_return + timedelta(milliseconds=i*100)
alerts = algo.process(roi_id, camera_id, person_track, t)
# Check for resolve event
if alerts:
for alert in alerts:
if alert.get("alert_type") == "alarm_resolve":
resolve_event = alert
print(f" [!] Resolve event triggered: {t}")
print(f" alert_type: {alert['alert_type']}")
print(f" resolve_alarm_id: {alert['resolve_alarm_id']}")
print(f" duration_ms: {alert['duration_ms']}")
print(f" last_frame_time: {alert['last_frame_time']}")
print(f" resolve_type: {alert['resolve_type']}")
break
if algo.state == "CONFIRMING_ON_DUTY" and i < 10:
if i == 0:
print(f" State change detected: ALARMED -> CONFIRMING_ON_DUTY (time: {t})")
if algo.state == "ON_DUTY":
print(f" State change detected: CONFIRMING_ON_DUTY -> ON_DUTY (time: {t})")
print(f" State: {algo.state}")
break
assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}"
assert resolve_event is not None, "Resolve event should be triggered"
assert resolve_event["resolve_alarm_id"] == alarm_id, "resolve_alarm_id mismatch"
assert resolve_event["duration_ms"] > 0, "duration_ms should be > 0"
assert resolve_event["resolve_type"] == "person_returned", "resolve_type should be person_returned"
print(" [OK] Return confirmed, resolve event sent")
print("\n" + "=" * 80)
print("Stage 7: Update Alarm Record (Simulated resolve_alarm)")
print("=" * 80)
# Simulate resolve_alarm update
alarm["duration_ms"] = resolve_event["duration_ms"]
alarm["last_frame_time"] = datetime.strptime(resolve_event["last_frame_time"], '%Y-%m-%d %H:%M:%S')
alarm["alarm_status"] = "CLOSED"
alarm["handle_status"] = "DONE"
alarm["handle_remark"] = "Person returned - auto closed"
print(f" Alarm ID: {alarm_id}")
print(f" duration_ms: {alarm['duration_ms']} ({alarm['duration_ms']/1000:.1f}s)")
print(f" last_frame_time: {alarm['last_frame_time']}")
print(f" alarm_status: {alarm['alarm_status']}")
print(f" handle_status: {alarm['handle_status']}")
print(f" handle_remark: {alarm['handle_remark']}")
assert alarm["duration_ms"] is not None, "duration_ms should be filled"
assert alarm["duration_ms"] > 0, "duration_ms should be > 0"
assert alarm["last_frame_time"] is not None, "last_frame_time should be filled"
assert alarm["alarm_status"] == "CLOSED", f"alarm_status should be CLOSED, got {alarm['alarm_status']}"
assert alarm["handle_status"] == "DONE", f"handle_status should be DONE, got {alarm['handle_status']}"
print(" [OK] Alarm record updated, duration_ms filled, status closed")
print("\n" + "=" * 80)
print("Stage 8: Verify Complete Workflow")
print("=" * 80)
# Verify duration calculation
expected_duration_ms = int((datetime.strptime(resolve_event["last_frame_time"], '%Y-%m-%d %H:%M:%S') - first_frame_time_dt).total_seconds() * 1000)
actual_duration_ms = alarm["duration_ms"]
print(f" Expected duration_ms: {expected_duration_ms}ms")
print(f" Actual duration_ms: {actual_duration_ms}ms")
print(f" Difference: {abs(expected_duration_ms - actual_duration_ms)}ms")
# Allow 1 second error (1000ms)
assert abs(expected_duration_ms - actual_duration_ms) <= 1000, "duration_ms calculation error too large"
print(" [OK] duration_ms calculation correct")
print("\n" + "=" * 80)
print("SUCCESS: Full workflow test passed")
print("=" * 80)
print("\nWorkflow Summary:")
print(f" 1. On duty time: {t0}")
print(f" 2. Leave time: {first_frame_time_dt}")
print(f" 3. Alarm time: {alarm['event_time']}")
print(f" 4. Return time: {alarm['last_frame_time']}")
print(f" 5. Leave duration: {alarm['duration_ms']/1000:.1f}s ({alarm['duration_ms']/60000:.1f}min)")
print(f" 6. Alarm status: {alarm['alarm_status']}")
print(f" 7. Handle status: {alarm['handle_status']}")
print("\n [OK] Test completed successfully")
except Exception as e:
print(f"\n" + "=" * 80)
print(f"FAILED: Test failed")
print("=" * 80)
print(f"Error: {e}")
import traceback
traceback.print_exc()
raise
if __name__ == "__main__":
test_leave_post_full_workflow()