Compare commits
5 Commits
ecebdd514f
...
6d408386bc
| Author | SHA1 | Date | |
|---|---|---|---|
| 6d408386bc | |||
| 690eb66277 | |||
| 5b2440c467 | |||
| 7496a6fe04 | |||
| 4ebded3385 |
407
algorithms.py
407
algorithms.py
@@ -17,55 +17,86 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
|
||||
class LeavePostAlgorithm:
|
||||
STATE_WAITING = "WAITING"
|
||||
STATE_CONFIRMING = "CONFIRMING"
|
||||
STATE_ON_DUTY = "ON_DUTY"
|
||||
STATE_LEAVING = "LEAVING"
|
||||
STATE_OFF_DUTY = "OFF_DUTY"
|
||||
STATE_NON_WORK_TIME = "NON_WORK_TIME"
|
||||
"""
|
||||
离岗检测算法(优化版 v2.0)
|
||||
|
||||
状态机:
|
||||
INIT → CONFIRMING_ON_DUTY → ON_DUTY → CONFIRMING_OFF_DUTY
|
||||
→ OFF_DUTY_COUNTDOWN → ALARMED → (回岗) → CONFIRMING_ON_DUTY
|
||||
|
||||
业务流程:
|
||||
1. 启动后检测到人 → 上岗确认期(confirm_on_duty_sec,默认10秒)
|
||||
2. 确认上岗后 → 在岗状态(ON_DUTY)
|
||||
3. 人离开ROI → 离岗确认期(confirm_off_duty_sec,默认30秒)
|
||||
4. 确认离岗后 → 离岗倒计时(leave_countdown_sec,默认300秒)
|
||||
5. 倒计时结束 → 触发告警(ALARMED状态)
|
||||
6. 人员回岗 → 回岗确认期(confirm_return_sec,默认10秒)
|
||||
7. 确认回岗 → 发送resolve事件 → 重新上岗确认
|
||||
"""
|
||||
|
||||
# 状态定义
|
||||
STATE_INIT = "INIT" # 初始化
|
||||
STATE_CONFIRMING_ON_DUTY = "CONFIRMING_ON_DUTY" # 上岗确认中
|
||||
STATE_ON_DUTY = "ON_DUTY" # 在岗
|
||||
STATE_CONFIRMING_OFF_DUTY = "CONFIRMING_OFF_DUTY" # 离岗确认中
|
||||
STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN" # 离岗倒计时中
|
||||
STATE_ALARMED = "ALARMED" # 已告警(等待回岗)
|
||||
STATE_NON_WORK_TIME = "NON_WORK_TIME" # 非工作时间
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
confirm_on_duty_sec: int = 10,
|
||||
confirm_leave_sec: int = 30,
|
||||
cooldown_sec: int = 600,
|
||||
confirm_on_duty_sec: int = 10, # 上岗确认窗口(持续检测到人的时长)
|
||||
confirm_off_duty_sec: int = 30, # 离岗确认窗口(持续未检测到人的时长)
|
||||
confirm_return_sec: int = 10, # 回岗确认窗口(告警后回来需确认时长)
|
||||
leave_countdown_sec: int = 300, # 离岗倒计时(确认离岗后等待告警的时长)
|
||||
cooldown_sec: int = 600, # 告警冷却期(两次告警的最小间隔)
|
||||
working_hours: Optional[List[Dict]] = None,
|
||||
target_class: Optional[str] = "person",
|
||||
# 兼容旧参数名(向后兼容)
|
||||
confirm_leave_sec: Optional[int] = None,
|
||||
):
|
||||
# 时间参数(处理向后兼容)
|
||||
self.confirm_on_duty_sec = confirm_on_duty_sec
|
||||
self.confirm_leave_sec = confirm_leave_sec
|
||||
self.confirm_off_duty_sec = confirm_leave_sec if confirm_leave_sec is not None else confirm_off_duty_sec
|
||||
self.confirm_return_sec = confirm_return_sec
|
||||
self.leave_countdown_sec = leave_countdown_sec
|
||||
self.cooldown_sec = cooldown_sec
|
||||
|
||||
# 工作时间和目标类别
|
||||
self.working_hours = working_hours or []
|
||||
self.target_class = target_class
|
||||
|
||||
# 状态变量
|
||||
self.state: str = self.STATE_INIT
|
||||
self.state_start_time: Optional[datetime] = None
|
||||
|
||||
# 滑动窗口(用于平滑检测结果)
|
||||
self.detection_window: deque = deque() # [(timestamp, has_person), ...]
|
||||
self.window_size_sec = 10 # 滑动窗口大小:10秒
|
||||
|
||||
# 告警追踪
|
||||
self._last_alarm_id: Optional[str] = None
|
||||
self._leave_start_time: Optional[datetime] = None # 人员离开时间(用于计算持续时长)
|
||||
self._alarm_triggered_time: Optional[datetime] = None # 告警触发时间
|
||||
self.alert_cooldowns: Dict[str, datetime] = {}
|
||||
|
||||
self.state: str = self.STATE_WAITING
|
||||
self.state_start_time: Optional[datetime] = None
|
||||
self.detection_history: deque = deque()
|
||||
|
||||
self.alarm_sent: bool = False
|
||||
self.last_person_time: Optional[datetime] = None
|
||||
|
||||
self._last_alarm_id: Optional[str] = None # 最近一次告警ID
|
||||
self._leave_start_time: Optional[datetime] = None # 离岗开始时间(LEAVING 状态的开始时间)
|
||||
|
||||
def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
|
||||
"""检查是否在工作时间"""
|
||||
if not self.working_hours:
|
||||
return True
|
||||
|
||||
|
||||
import json
|
||||
|
||||
|
||||
working_hours = self.working_hours
|
||||
if isinstance(working_hours, str):
|
||||
try:
|
||||
working_hours = json.loads(working_hours)
|
||||
except:
|
||||
return True
|
||||
|
||||
|
||||
if not working_hours:
|
||||
return True
|
||||
|
||||
|
||||
dt = dt or datetime.now()
|
||||
current_minutes = dt.hour * 60 + dt.minute
|
||||
for period in working_hours:
|
||||
@@ -76,7 +107,7 @@ class LeavePostAlgorithm:
|
||||
if start_minutes <= current_minutes < end_minutes:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _parse_time_to_minutes(self, time_str: str) -> int:
|
||||
"""将时间字符串转换为分钟数"""
|
||||
if isinstance(time_str, int):
|
||||
@@ -88,31 +119,43 @@ class LeavePostAlgorithm:
|
||||
return 0
|
||||
|
||||
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
|
||||
"""检查检测结果是否在ROI内"""
|
||||
matched_rois = detection.get("matched_rois", [])
|
||||
for roi in matched_rois:
|
||||
if roi.get("roi_id") == roi_id:
|
||||
return True
|
||||
return False
|
||||
return any(roi.get("roi_id") == roi_id for roi in matched_rois)
|
||||
|
||||
def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool:
|
||||
def _check_target_class(self, detection: Dict, target_class: str) -> bool:
|
||||
"""检查是否为目标类别"""
|
||||
if not target_class:
|
||||
return True
|
||||
return detection.get("class") == target_class
|
||||
|
||||
def _get_detection_window(self, current_time: datetime) -> List[bool]:
|
||||
detections = []
|
||||
while self.detection_history and (current_time - self.detection_history[0][0]).total_seconds() > max(self.confirm_on_duty_sec, self.confirm_leave_sec):
|
||||
self.detection_history.popleft()
|
||||
for _, has_person in self.detection_history:
|
||||
detections.append(has_person)
|
||||
return detections
|
||||
|
||||
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
|
||||
"""获取ROI内最新的检测框"""
|
||||
for det in tracks:
|
||||
if self._check_detection_in_roi(det, roi_id):
|
||||
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
|
||||
return det.get("bbox", [])
|
||||
return []
|
||||
|
||||
def _update_detection_window(self, current_time: datetime, has_person: bool):
|
||||
"""更新滑动窗口"""
|
||||
self.detection_window.append((current_time, has_person))
|
||||
|
||||
# 移除窗口外的旧数据
|
||||
while self.detection_window:
|
||||
oldest_time, _ = self.detection_window[0]
|
||||
if (current_time - oldest_time).total_seconds() > self.window_size_sec:
|
||||
self.detection_window.popleft()
|
||||
else:
|
||||
break
|
||||
|
||||
def _get_detection_ratio(self) -> float:
|
||||
"""计算滑动窗口内的检测命中率"""
|
||||
if not self.detection_window:
|
||||
return 0.0
|
||||
|
||||
person_count = sum(1 for _, has_person in self.detection_window if has_person)
|
||||
return person_count / len(self.detection_window)
|
||||
|
||||
def process(
|
||||
self,
|
||||
roi_id: str,
|
||||
@@ -120,125 +163,173 @@ class LeavePostAlgorithm:
|
||||
tracks: List[Dict],
|
||||
current_time: Optional[datetime] = None,
|
||||
) -> List[Dict]:
|
||||
current_time = current_time or datetime.now()
|
||||
"""
|
||||
处理单帧检测结果
|
||||
|
||||
in_work = self._is_in_working_hours(current_time)
|
||||
Args:
|
||||
roi_id: ROI区域ID
|
||||
camera_id: 摄像头ID
|
||||
tracks: 检测结果列表
|
||||
current_time: 当前时间(用于测试,生产环境传None)
|
||||
|
||||
Returns:
|
||||
告警列表 [{"alert_type": "leave_post", ...}, {"alert_type": "alarm_resolve", ...}]
|
||||
"""
|
||||
current_time = current_time or datetime.now()
|
||||
alerts = []
|
||||
|
||||
if not in_work:
|
||||
if self.state == self.STATE_OFF_DUTY and self._last_alarm_id and self._leave_start_time:
|
||||
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
|
||||
alerts.append({
|
||||
"alert_type": "alarm_resolve",
|
||||
"resolve_alarm_id": self._last_alarm_id,
|
||||
"duration_ms": duration_ms,
|
||||
"last_frame_time": current_time.isoformat(),
|
||||
"resolve_type": "non_work_time",
|
||||
})
|
||||
self._last_alarm_id = None
|
||||
self._leave_start_time = None
|
||||
self.state = self.STATE_NON_WORK_TIME
|
||||
self.detection_history.clear()
|
||||
self.alarm_sent = False
|
||||
return alerts
|
||||
# 检查ROI内是否有目标
|
||||
roi_has_person = any(
|
||||
self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class)
|
||||
for det in tracks
|
||||
)
|
||||
|
||||
if self.state == self.STATE_NON_WORK_TIME:
|
||||
self.state = self.STATE_WAITING
|
||||
self.state_start_time = None
|
||||
self.detection_history.clear()
|
||||
self.alarm_sent = False
|
||||
self.alert_cooldowns.clear() # 新工作时段,清除冷却记录
|
||||
# 更新滑动窗口
|
||||
self._update_detection_window(current_time, roi_has_person)
|
||||
detection_ratio = self._get_detection_ratio()
|
||||
|
||||
roi_has_person = False
|
||||
for det in tracks:
|
||||
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
|
||||
roi_has_person = True
|
||||
break
|
||||
# 检查工作时间
|
||||
in_working_hours = self._is_in_working_hours(current_time)
|
||||
|
||||
if self.state == self.STATE_WAITING:
|
||||
if roi_has_person:
|
||||
# 检测到人,进入上岗确认阶段
|
||||
self.state = self.STATE_CONFIRMING
|
||||
self.state_start_time = current_time
|
||||
self.detection_history.clear()
|
||||
self.detection_history.append((current_time, True))
|
||||
else:
|
||||
pass
|
||||
|
||||
elif self.state == self.STATE_CONFIRMING:
|
||||
self.detection_history.append((current_time, roi_has_person))
|
||||
if not roi_has_person:
|
||||
# 人消失,回到等待状态
|
||||
self.state = self.STATE_WAITING
|
||||
self.state_start_time = None
|
||||
self.detection_history.clear()
|
||||
else:
|
||||
elapsed = (current_time - self.state_start_time).total_seconds()
|
||||
if elapsed >= self.confirm_on_duty_sec:
|
||||
# 持续在岗达到确认时长,正式确认上岗
|
||||
self.state = self.STATE_ON_DUTY
|
||||
self.state_start_time = current_time
|
||||
# 确认在岗后清除冷却记录,允许新一轮离岗检测告警
|
||||
self.alert_cooldowns.clear()
|
||||
|
||||
elif self.state == self.STATE_ON_DUTY:
|
||||
self.detection_history.append((current_time, roi_has_person))
|
||||
if not roi_has_person:
|
||||
self.state = self.STATE_LEAVING
|
||||
self.state_start_time = current_time
|
||||
|
||||
elif self.state == self.STATE_LEAVING:
|
||||
self.detection_history.append((current_time, roi_has_person))
|
||||
elapsed = (current_time - self.state_start_time).total_seconds()
|
||||
|
||||
if roi_has_person:
|
||||
self.state = self.STATE_ON_DUTY
|
||||
self.state_start_time = current_time
|
||||
elif elapsed >= self.confirm_leave_sec:
|
||||
# 确认离岗后直接触发告警,不再进入 OFF_DUTY 二次等待
|
||||
leaving_start_time = self.state_start_time # 保存 LEAVING 状态开始时间(人员离开时间)
|
||||
self.state = self.STATE_OFF_DUTY
|
||||
self.state_start_time = current_time
|
||||
|
||||
cooldown_key = f"{camera_id}_{roi_id}"
|
||||
now = datetime.now()
|
||||
if cooldown_key not in self.alert_cooldowns or (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
|
||||
bbox = self._get_latest_bbox(tracks, roi_id)
|
||||
elapsed_minutes = int(elapsed / 60)
|
||||
alerts.append({
|
||||
"track_id": roi_id,
|
||||
"camera_id": camera_id,
|
||||
"bbox": bbox,
|
||||
"duration_minutes": elapsed_minutes,
|
||||
"alert_type": "leave_post",
|
||||
"message": f"离岗 {elapsed_minutes} 分钟",
|
||||
})
|
||||
self.alert_cooldowns[cooldown_key] = now
|
||||
|
||||
# 保存告警追踪信息(alarm_id 由 main.py 通过 set_last_alarm_id() 回填)
|
||||
self._last_alarm_id = None
|
||||
self._leave_start_time = leaving_start_time # LEAVING 状态开始时间 = 人员离开时间
|
||||
|
||||
elif self.state == self.STATE_OFF_DUTY:
|
||||
# OFF_DUTY 状态:只等待人员回岗,不再重复告警
|
||||
# 必须经过 CONFIRMING 重新确认在岗后,才允许新一轮离岗检测
|
||||
if roi_has_person:
|
||||
# 生成 resolve 事件(人员回岗)
|
||||
# === 非工作时间处理 ===
|
||||
if not in_working_hours:
|
||||
if self.state != self.STATE_NON_WORK_TIME:
|
||||
# 进入非工作时间,清理状态
|
||||
if self._last_alarm_id and self._leave_start_time:
|
||||
# 如果有未结束的告警,发送resolve事件(非工作时间自动关闭)
|
||||
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
|
||||
alerts.append({
|
||||
"alert_type": "alarm_resolve",
|
||||
"resolve_alarm_id": self._last_alarm_id,
|
||||
"duration_ms": duration_ms,
|
||||
"last_frame_time": current_time.isoformat(),
|
||||
"resolve_type": "person_returned",
|
||||
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
"resolve_type": "non_work_time",
|
||||
})
|
||||
|
||||
self.state = self.STATE_NON_WORK_TIME
|
||||
self.state_start_time = None
|
||||
self.detection_window.clear()
|
||||
self._last_alarm_id = None
|
||||
self._leave_start_time = None
|
||||
self.state = self.STATE_CONFIRMING
|
||||
self._alarm_triggered_time = None
|
||||
return alerts
|
||||
|
||||
# === 工作时间处理 ===
|
||||
|
||||
# 从非工作时间恢复
|
||||
if self.state == self.STATE_NON_WORK_TIME:
|
||||
self.state = self.STATE_INIT
|
||||
self.state_start_time = current_time
|
||||
self.detection_window.clear()
|
||||
|
||||
# === 状态机处理 ===
|
||||
|
||||
if self.state == self.STATE_INIT:
|
||||
# 初始化状态:等待检测到人
|
||||
if roi_has_person:
|
||||
self.state = self.STATE_CONFIRMING_ON_DUTY
|
||||
self.state_start_time = current_time
|
||||
self.detection_history.clear()
|
||||
self.detection_history.append((current_time, True))
|
||||
logger.debug(f"ROI {roi_id}: INIT → CONFIRMING_ON_DUTY")
|
||||
|
||||
elif self.state == self.STATE_CONFIRMING_ON_DUTY:
|
||||
# 上岗确认中:需要持续检测到人
|
||||
elapsed = (current_time - self.state_start_time).total_seconds()
|
||||
|
||||
if detection_ratio == 0:
|
||||
# 人消失了,回到INIT
|
||||
self.state = self.STATE_INIT
|
||||
self.state_start_time = current_time
|
||||
self.detection_window.clear()
|
||||
logger.debug(f"ROI {roi_id}: CONFIRMING_ON_DUTY → INIT (人消失)")
|
||||
elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.7:
|
||||
# 上岗确认成功(命中率>=70%)
|
||||
self.state = self.STATE_ON_DUTY
|
||||
self.state_start_time = current_time
|
||||
self.alert_cooldowns.clear() # 确认在岗后清除冷却记录
|
||||
logger.info(f"ROI {roi_id}: CONFIRMING_ON_DUTY → ON_DUTY (上岗确认成功)")
|
||||
|
||||
elif self.state == self.STATE_ON_DUTY:
|
||||
# 在岗状态:监控是否离岗
|
||||
if detection_ratio == 0:
|
||||
# 滑动窗口内完全没有人,进入离岗确认
|
||||
self.state = self.STATE_CONFIRMING_OFF_DUTY
|
||||
self.state_start_time = current_time
|
||||
logger.debug(f"ROI {roi_id}: ON_DUTY → CONFIRMING_OFF_DUTY")
|
||||
|
||||
elif self.state == self.STATE_CONFIRMING_OFF_DUTY:
|
||||
# 离岗确认中:需要持续未检测到人
|
||||
elapsed = (current_time - self.state_start_time).total_seconds()
|
||||
|
||||
if roi_has_person:
|
||||
# 人回来了,回到ON_DUTY
|
||||
self.state = self.STATE_ON_DUTY
|
||||
self.state_start_time = current_time
|
||||
logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了)")
|
||||
elif elapsed >= self.confirm_off_duty_sec and detection_ratio == 0:
|
||||
# 离岗确认成功,进入倒计时
|
||||
self.state = self.STATE_OFF_DUTY_COUNTDOWN
|
||||
self.state_start_time = current_time
|
||||
self._leave_start_time = self.state_start_time # 记录离开时间
|
||||
logger.info(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → OFF_DUTY_COUNTDOWN (离岗确认成功)")
|
||||
|
||||
elif self.state == self.STATE_OFF_DUTY_COUNTDOWN:
|
||||
# 离岗倒计时中:等待告警触发
|
||||
elapsed = (current_time - self.state_start_time).total_seconds()
|
||||
|
||||
if roi_has_person:
|
||||
# 倒计时期间人回来了,回到ON_DUTY(未触发告警)
|
||||
self.state = self.STATE_ON_DUTY
|
||||
self.state_start_time = current_time
|
||||
self._leave_start_time = None
|
||||
logger.info(f"ROI {roi_id}: OFF_DUTY_COUNTDOWN → ON_DUTY (倒计时期间回来)")
|
||||
elif elapsed >= self.leave_countdown_sec:
|
||||
# 倒计时结束,触发告警
|
||||
cooldown_key = f"{camera_id}_{roi_id}"
|
||||
if cooldown_key not in self.alert_cooldowns or \
|
||||
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
|
||||
|
||||
bbox = self._get_latest_bbox(tracks, roi_id)
|
||||
|
||||
alerts.append({
|
||||
"track_id": roi_id,
|
||||
"camera_id": camera_id,
|
||||
"bbox": bbox,
|
||||
"alert_type": "leave_post",
|
||||
"message": "人员离岗告警",
|
||||
"first_frame_time": self._leave_start_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
})
|
||||
|
||||
self.alert_cooldowns[cooldown_key] = current_time
|
||||
self._alarm_triggered_time = current_time
|
||||
self.state = self.STATE_ALARMED
|
||||
# _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填
|
||||
logger.warning(f"ROI {roi_id}: OFF_DUTY_COUNTDOWN → ALARMED (告警触发)")
|
||||
|
||||
elif self.state == self.STATE_ALARMED:
|
||||
# 已告警状态:等待人员回岗
|
||||
if roi_has_person:
|
||||
# 检测到人,进入回岗确认
|
||||
self.state = self.STATE_CONFIRMING_ON_DUTY # 复用上岗确认状态
|
||||
self.state_start_time = current_time
|
||||
logger.info(f"ROI {roi_id}: ALARMED → CONFIRMING_ON_DUTY (检测到人回岗)")
|
||||
|
||||
# 特殊处理:从CONFIRMING_ON_DUTY再次确认上岗时,如果有未结束的告警,发送resolve
|
||||
if self.state == self.STATE_ON_DUTY and self._last_alarm_id:
|
||||
# 回岗确认成功,发送resolve事件
|
||||
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
|
||||
alerts.append({
|
||||
"alert_type": "alarm_resolve",
|
||||
"resolve_alarm_id": self._last_alarm_id,
|
||||
"duration_ms": duration_ms,
|
||||
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
"resolve_type": "person_returned",
|
||||
})
|
||||
|
||||
# 清理告警追踪信息
|
||||
self._last_alarm_id = None
|
||||
self._leave_start_time = None
|
||||
self._alarm_triggered_time = None
|
||||
logger.info(f"ROI {roi_id}: 告警已解决(人员回岗)")
|
||||
|
||||
return alerts
|
||||
|
||||
@@ -247,22 +338,36 @@ class LeavePostAlgorithm:
|
||||
self._last_alarm_id = alarm_id
|
||||
|
||||
def reset(self):
|
||||
self.state = self.STATE_WAITING
|
||||
"""重置算法状态"""
|
||||
self.state = self.STATE_INIT
|
||||
self.state_start_time = None
|
||||
self.detection_history.clear()
|
||||
self.alarm_sent = False
|
||||
self.last_person_time = None
|
||||
self.alert_cooldowns.clear()
|
||||
self.detection_window.clear()
|
||||
self._last_alarm_id = None
|
||||
self._leave_start_time = None
|
||||
self._alarm_triggered_time = None
|
||||
self.alert_cooldowns.clear()
|
||||
|
||||
def get_state(self, roi_id: str) -> Dict[str, Any]:
|
||||
return {
|
||||
"""获取当前状态(用于调试和监控)"""
|
||||
state_info = {
|
||||
"state": self.state,
|
||||
"alarm_sent": self.alarm_sent,
|
||||
"last_person_time": self.last_person_time,
|
||||
"state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
|
||||
"detection_ratio": self._get_detection_ratio(),
|
||||
"window_size": len(self.detection_window),
|
||||
}
|
||||
|
||||
# 添加状态特定信息
|
||||
if self.state == self.STATE_OFF_DUTY_COUNTDOWN and self.state_start_time:
|
||||
elapsed = (datetime.now() - self.state_start_time).total_seconds()
|
||||
state_info["countdown_remaining_sec"] = max(0, self.leave_countdown_sec - elapsed)
|
||||
|
||||
if self.state == self.STATE_ALARMED and self._leave_start_time:
|
||||
total_off_duty_sec = (datetime.now() - self._leave_start_time).total_seconds()
|
||||
state_info["total_off_duty_sec"] = total_off_duty_sec
|
||||
state_info["alarm_id"] = self._last_alarm_id
|
||||
|
||||
return state_info
|
||||
|
||||
|
||||
class IntrusionAlgorithm:
|
||||
def __init__(
|
||||
@@ -583,6 +688,7 @@ class AlgorithmManager:
|
||||
self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm(
|
||||
confirm_on_duty_sec=algo_params["confirm_on_duty_sec"],
|
||||
confirm_leave_sec=algo_params["confirm_leave_sec"],
|
||||
leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时,默认5分钟
|
||||
cooldown_sec=algo_params["cooldown_sec"],
|
||||
working_hours=algo_params["working_hours"],
|
||||
target_class=algo_params["target_class"],
|
||||
@@ -683,6 +789,7 @@ class AlgorithmManager:
|
||||
self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm(
|
||||
confirm_on_duty_sec=algo_params.get("confirm_on_duty_sec", 10),
|
||||
confirm_leave_sec=algo_params.get("confirm_leave_sec", 30),
|
||||
leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时,默认5分钟
|
||||
cooldown_sec=algo_params.get("cooldown_sec", 600),
|
||||
working_hours=roi_working_hours,
|
||||
target_class=algo_params.get("target_class", "person"),
|
||||
|
||||
266
test_leave_post_full_workflow.py
Normal file
266
test_leave_post_full_workflow.py
Normal file
@@ -0,0 +1,266 @@
|
||||
"""
|
||||
离岗告警完整流程集成测试
|
||||
|
||||
测试完整业务流程:
|
||||
1. 人员上岗确认
|
||||
2. 人员离开ROI
|
||||
3. 离岗确认 → OFF_DUTY_COUNTDOWN
|
||||
4. 倒计时结束 → ALARMED (验证告警无 duration_minutes,有 first_frame_time)
|
||||
5. 创建告警记录 (验证 duration_ms=None)
|
||||
6. 人员返回 → CONFIRMING_ON_DUTY
|
||||
7. 回岗确认 → ON_DUTY (验证发送 resolve 事件)
|
||||
8. 更新告警记录 (验证 duration_ms 已填充)
|
||||
"""
|
||||
import sys
|
||||
sys.path.insert(0, 'C:/Users/16337/PycharmProjects/ai_edge')
|
||||
|
||||
from algorithms import LeavePostAlgorithm
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
def test_leave_post_full_workflow():
|
||||
"""测试离岗告警完整工作流"""
|
||||
print("=" * 80)
|
||||
print("Leave Post Alarm Full Workflow Integration Test")
|
||||
print("=" * 80)
|
||||
|
||||
algo = LeavePostAlgorithm(
|
||||
confirm_on_duty_sec=2, # 2秒上岗确认
|
||||
confirm_off_duty_sec=3, # 3秒离岗确认
|
||||
leave_countdown_sec=5, # 5秒离岗倒计时
|
||||
confirm_return_sec=2, # 2秒回岗确认
|
||||
)
|
||||
|
||||
roi_id = "test_roi_001"
|
||||
camera_id = "test_camera_001"
|
||||
person_track = [{"matched_rois": [{"roi_id": roi_id}], "class": "person", "bbox": [100, 100, 200, 200]}]
|
||||
|
||||
# 测试起始时间
|
||||
t0 = datetime(2026, 2, 13, 14, 0, 0)
|
||||
|
||||
# 存储告警数据(模拟数据库)
|
||||
alarm_db = {}
|
||||
|
||||
try:
|
||||
print("\n" + "=" * 80)
|
||||
print("Stage 1: Person On Duty Confirmation")
|
||||
print("=" * 80)
|
||||
print("Detecting person for 3 seconds (exceeds confirm_on_duty_sec=2s)")
|
||||
|
||||
for i in range(30): # 每100ms一帧,共3秒
|
||||
t = t0 + timedelta(milliseconds=i*100)
|
||||
alerts = algo.process(roi_id, camera_id, person_track, t)
|
||||
if alerts:
|
||||
print(f" [!] 意外告警: {alerts}")
|
||||
|
||||
print(f" State: {algo.state}")
|
||||
assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}"
|
||||
print(" [OK] Person on duty confirmed")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Stage 2: Person Leaves ROI")
|
||||
print("=" * 80)
|
||||
print("Sending empty frames, waiting for window expiry (10s) + leave confirmation (3s)")
|
||||
|
||||
t_leave = t0 + timedelta(seconds=3)
|
||||
first_frame_time_expected = None
|
||||
|
||||
# 发送空帧让检测窗口清空,然后进入离岗确认
|
||||
for i in range(140): # 每100ms一帧,共14秒
|
||||
t = t_leave + timedelta(milliseconds=i*100)
|
||||
alerts = algo.process(roi_id, camera_id, [], t)
|
||||
|
||||
if algo.state == "CONFIRMING_OFF_DUTY" and first_frame_time_expected is None:
|
||||
print(f" State change detected: ON_DUTY -> CONFIRMING_OFF_DUTY (time: {t})")
|
||||
|
||||
if algo.state == "OFF_DUTY_COUNTDOWN":
|
||||
if first_frame_time_expected is None:
|
||||
first_frame_time_expected = t
|
||||
print(f" State change detected: CONFIRMING_OFF_DUTY -> OFF_DUTY_COUNTDOWN")
|
||||
print(f" Leave confirmed, recording leave time: {first_frame_time_expected}")
|
||||
break
|
||||
|
||||
print(f" State: {algo.state}")
|
||||
assert algo.state == "OFF_DUTY_COUNTDOWN", f"Expected OFF_DUTY_COUNTDOWN, got {algo.state}"
|
||||
print(" [OK] Leave confirmed, countdown started")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Stage 3: Leave Countdown")
|
||||
print("=" * 80)
|
||||
print("Continuing empty frames, waiting for countdown (5s)")
|
||||
|
||||
t_countdown = t_leave + timedelta(seconds=14)
|
||||
alarm_triggered = None
|
||||
|
||||
for i in range(60): # 每100ms一帧,共6秒 (确保超过倒计时)
|
||||
t = t_countdown + timedelta(milliseconds=i*100)
|
||||
alerts = algo.process(roi_id, camera_id, [], t)
|
||||
|
||||
if alerts:
|
||||
for alert in alerts:
|
||||
if alert.get("alert_type") == "leave_post":
|
||||
alarm_triggered = alert
|
||||
print(f" [!] Alarm triggered: {t}")
|
||||
print(f" alert_type: {alert['alert_type']}")
|
||||
print(f" message: {alert['message']}")
|
||||
print(f" first_frame_time: {alert.get('first_frame_time')}")
|
||||
print(f" duration_minutes: {alert.get('duration_minutes', 'NOT_PRESENT')}")
|
||||
break
|
||||
|
||||
if algo.state == "ALARMED":
|
||||
print(f" State: {algo.state}")
|
||||
break
|
||||
|
||||
assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}"
|
||||
assert alarm_triggered is not None, "Alarm should be triggered"
|
||||
assert "duration_minutes" not in alarm_triggered, "Alarm should NOT contain duration_minutes"
|
||||
assert "first_frame_time" in alarm_triggered, "Alarm should contain first_frame_time"
|
||||
print(" [OK] Alarm triggered successfully, NO duration_minutes field")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Stage 4: Create Alarm Record (Simulated DB)")
|
||||
print("=" * 80)
|
||||
# Simulate saving alarm to database
|
||||
import uuid
|
||||
alarm_id = f"ALM{datetime.now().strftime('%Y%m%d%H%M%S')}{uuid.uuid4().hex[:4].upper()}"
|
||||
|
||||
# Parse first_frame_time
|
||||
first_frame_time_str = alarm_triggered.get("first_frame_time")
|
||||
first_frame_time_dt = datetime.strptime(first_frame_time_str, '%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# Simulated alarm record
|
||||
alarm_db[alarm_id] = {
|
||||
"alarm_id": alarm_id,
|
||||
"alarm_type": "leave_post",
|
||||
"device_id": camera_id,
|
||||
"scene_id": roi_id,
|
||||
"event_time": t, # Alarm trigger time
|
||||
"first_frame_time": first_frame_time_dt, # Leave time
|
||||
"last_frame_time": None, # Not returned yet
|
||||
"duration_ms": None, # Duration unknown
|
||||
"alarm_status": "NEW",
|
||||
"handle_status": "UNHANDLED",
|
||||
}
|
||||
|
||||
# Set alarm_id in algorithm
|
||||
algo.set_last_alarm_id(alarm_id)
|
||||
|
||||
alarm = alarm_db[alarm_id]
|
||||
print(f" Alarm ID: {alarm_id}")
|
||||
print(f" event_time: {alarm['event_time']}")
|
||||
print(f" first_frame_time: {alarm['first_frame_time']}")
|
||||
print(f" last_frame_time: {alarm['last_frame_time']} (NULL)")
|
||||
print(f" duration_ms: {alarm['duration_ms']} (NULL)")
|
||||
assert alarm["duration_ms"] is None, "duration_ms should be None"
|
||||
assert alarm["last_frame_time"] is None, "last_frame_time should be None"
|
||||
print(" [OK] Alarm created, duration_ms=NULL, last_frame_time=NULL")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Stage 5: Person Returns to ROI")
|
||||
print("=" * 80)
|
||||
print("Sending person detection frames, waiting for return confirmation (2s)")
|
||||
|
||||
t_return = t_countdown + timedelta(seconds=7) # Returned 1s after alarm
|
||||
|
||||
# Send enough person frames to fill detection window and exceed confirm_return_sec
|
||||
for i in range(130): # 100ms per frame, 13s total (way more than needed)
|
||||
t = t_return + timedelta(milliseconds=i*100)
|
||||
alerts = algo.process(roi_id, camera_id, person_track, t)
|
||||
|
||||
# Check for resolve event
|
||||
if alerts:
|
||||
for alert in alerts:
|
||||
if alert.get("alert_type") == "alarm_resolve":
|
||||
resolve_event = alert
|
||||
print(f" [!] Resolve event triggered: {t}")
|
||||
print(f" alert_type: {alert['alert_type']}")
|
||||
print(f" resolve_alarm_id: {alert['resolve_alarm_id']}")
|
||||
print(f" duration_ms: {alert['duration_ms']}")
|
||||
print(f" last_frame_time: {alert['last_frame_time']}")
|
||||
print(f" resolve_type: {alert['resolve_type']}")
|
||||
break
|
||||
|
||||
if algo.state == "CONFIRMING_ON_DUTY" and i < 10:
|
||||
if i == 0:
|
||||
print(f" State change detected: ALARMED -> CONFIRMING_ON_DUTY (time: {t})")
|
||||
|
||||
if algo.state == "ON_DUTY":
|
||||
print(f" State change detected: CONFIRMING_ON_DUTY -> ON_DUTY (time: {t})")
|
||||
print(f" State: {algo.state}")
|
||||
break
|
||||
|
||||
assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}"
|
||||
assert resolve_event is not None, "Resolve event should be triggered"
|
||||
assert resolve_event["resolve_alarm_id"] == alarm_id, "resolve_alarm_id mismatch"
|
||||
assert resolve_event["duration_ms"] > 0, "duration_ms should be > 0"
|
||||
assert resolve_event["resolve_type"] == "person_returned", "resolve_type should be person_returned"
|
||||
print(" [OK] Return confirmed, resolve event sent")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Stage 7: Update Alarm Record (Simulated resolve_alarm)")
|
||||
print("=" * 80)
|
||||
|
||||
# Simulate resolve_alarm update
|
||||
alarm["duration_ms"] = resolve_event["duration_ms"]
|
||||
alarm["last_frame_time"] = datetime.strptime(resolve_event["last_frame_time"], '%Y-%m-%d %H:%M:%S')
|
||||
alarm["alarm_status"] = "CLOSED"
|
||||
alarm["handle_status"] = "DONE"
|
||||
alarm["handle_remark"] = "Person returned - auto closed"
|
||||
|
||||
print(f" Alarm ID: {alarm_id}")
|
||||
print(f" duration_ms: {alarm['duration_ms']} ({alarm['duration_ms']/1000:.1f}s)")
|
||||
print(f" last_frame_time: {alarm['last_frame_time']}")
|
||||
print(f" alarm_status: {alarm['alarm_status']}")
|
||||
print(f" handle_status: {alarm['handle_status']}")
|
||||
print(f" handle_remark: {alarm['handle_remark']}")
|
||||
|
||||
assert alarm["duration_ms"] is not None, "duration_ms should be filled"
|
||||
assert alarm["duration_ms"] > 0, "duration_ms should be > 0"
|
||||
assert alarm["last_frame_time"] is not None, "last_frame_time should be filled"
|
||||
assert alarm["alarm_status"] == "CLOSED", f"alarm_status should be CLOSED, got {alarm['alarm_status']}"
|
||||
assert alarm["handle_status"] == "DONE", f"handle_status should be DONE, got {alarm['handle_status']}"
|
||||
print(" [OK] Alarm record updated, duration_ms filled, status closed")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Stage 8: Verify Complete Workflow")
|
||||
print("=" * 80)
|
||||
|
||||
# Verify duration calculation
|
||||
expected_duration_ms = int((datetime.strptime(resolve_event["last_frame_time"], '%Y-%m-%d %H:%M:%S') - first_frame_time_dt).total_seconds() * 1000)
|
||||
actual_duration_ms = alarm["duration_ms"]
|
||||
|
||||
print(f" Expected duration_ms: {expected_duration_ms}ms")
|
||||
print(f" Actual duration_ms: {actual_duration_ms}ms")
|
||||
print(f" Difference: {abs(expected_duration_ms - actual_duration_ms)}ms")
|
||||
|
||||
# Allow 1 second error (1000ms)
|
||||
assert abs(expected_duration_ms - actual_duration_ms) <= 1000, "duration_ms calculation error too large"
|
||||
|
||||
print(" [OK] duration_ms calculation correct")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("SUCCESS: Full workflow test passed")
|
||||
print("=" * 80)
|
||||
print("\nWorkflow Summary:")
|
||||
print(f" 1. On duty time: {t0}")
|
||||
print(f" 2. Leave time: {first_frame_time_dt}")
|
||||
print(f" 3. Alarm time: {alarm['event_time']}")
|
||||
print(f" 4. Return time: {alarm['last_frame_time']}")
|
||||
print(f" 5. Leave duration: {alarm['duration_ms']/1000:.1f}s ({alarm['duration_ms']/60000:.1f}min)")
|
||||
print(f" 6. Alarm status: {alarm['alarm_status']}")
|
||||
print(f" 7. Handle status: {alarm['handle_status']}")
|
||||
|
||||
print("\n [OK] Test completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n" + "=" * 80)
|
||||
print(f"FAILED: Test failed")
|
||||
print("=" * 80)
|
||||
print(f"Error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_leave_post_full_workflow()
|
||||
62
test_leave_post_no_duration.py
Normal file
62
test_leave_post_no_duration.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import sys
|
||||
sys.path.insert(0, 'C:/Users/16337/PycharmProjects/ai_edge')
|
||||
|
||||
from algorithms import LeavePostAlgorithm
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
def test_alarm_trigger_no_duration():
|
||||
"""测试告警触发时不包含持续时长"""
|
||||
algo = LeavePostAlgorithm(
|
||||
confirm_on_duty_sec=1,
|
||||
confirm_off_duty_sec=1,
|
||||
leave_countdown_sec=2,
|
||||
)
|
||||
|
||||
# 滑动窗口大小是10秒,我们需要在整个流程中考虑这一点
|
||||
t0 = datetime(2026, 2, 12, 14, 0, 0)
|
||||
person_track = [{"matched_rois": [{"roi_id": "roi1"}], "class": "person"}]
|
||||
|
||||
# 1. 上岗阶段:持续检测到人 (2秒,超过confirm_on_duty_sec=1秒)
|
||||
for i in range(20): # 每100ms一帧,共2秒
|
||||
t = t0 + timedelta(milliseconds=i*100)
|
||||
algo.process("roi1", "cam1", person_track, t)
|
||||
|
||||
print(f"[1] 上岗后状态: {algo.state}")
|
||||
assert algo.state == "ON_DUTY", f"Expected ON_DUTY, got {algo.state}"
|
||||
|
||||
# 2. 离开阶段:等待11秒让窗口内的人检测完全过期,然后再发空帧
|
||||
# 关键:需要等待窗口过期(10秒) + 确认时间(1秒)
|
||||
t_leave_start = t0 + timedelta(seconds=2)
|
||||
|
||||
# 发送空帧超过窗口大小,确保detection_ratio变为0
|
||||
for i in range(120): # 每100ms一帧,共12秒
|
||||
t = t_leave_start + timedelta(milliseconds=i*100)
|
||||
algo.process("roi1", "cam1", [], t)
|
||||
|
||||
print(f"[2] 离开12秒后状态: {algo.state}")
|
||||
|
||||
# 3. 继续发送空帧,等待倒计时结束 (leave_countdown_sec=2秒)
|
||||
t_wait_alarm = t_leave_start + timedelta(seconds=12)
|
||||
for i in range(30): # 每100ms一帧,共3秒 (超过倒计时)
|
||||
t = t_wait_alarm + timedelta(milliseconds=i*100)
|
||||
alerts = algo.process("roi1", "cam1", [], t)
|
||||
if alerts:
|
||||
break
|
||||
|
||||
print(f"[3] 告警触发后状态: {algo.state}")
|
||||
print(f" 告警数量: {len(alerts)}")
|
||||
print(f" 告警内容: {alerts}")
|
||||
|
||||
# ✅ 验证:告警不包含 duration_minutes
|
||||
assert len(alerts) == 1, f"Expected 1 alert, got {len(alerts)}"
|
||||
alarm = alerts[0]
|
||||
assert alarm["alert_type"] == "leave_post"
|
||||
assert "duration_minutes" not in alarm, "告警不应包含 duration_minutes"
|
||||
assert "first_frame_time" in alarm, "告警应包含 first_frame_time"
|
||||
assert alarm["message"] == "人员离岗告警", f"消息错误: {alarm['message']}"
|
||||
|
||||
print(f"测试通过:告警不含持续时长")
|
||||
print(f" first_frame_time: {alarm['first_frame_time']}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_alarm_trigger_no_duration()
|
||||
Reference in New Issue
Block a user