diff --git a/algorithms.py b/algorithms.py index 589e053..6f3ed7f 100644 --- a/algorithms.py +++ b/algorithms.py @@ -17,59 +17,86 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) class LeavePostAlgorithm: - STATE_WAITING = "WAITING" - STATE_CONFIRMING = "CONFIRMING" - STATE_ON_DUTY = "ON_DUTY" - STATE_LEAVING = "LEAVING" - STATE_OFF_DUTY = "OFF_DUTY" - STATE_NON_WORK_TIME = "NON_WORK_TIME" + """ + 离岗检测算法(优化版 v2.0) + + 状态机: + INIT → CONFIRMING_ON_DUTY → ON_DUTY → CONFIRMING_OFF_DUTY + → OFF_DUTY_COUNTDOWN → ALARMED → (回岗) → CONFIRMING_ON_DUTY + + 业务流程: + 1. 启动后检测到人 → 上岗确认期(confirm_on_duty_sec,默认10秒) + 2. 确认上岗后 → 在岗状态(ON_DUTY) + 3. 人离开ROI → 离岗确认期(confirm_off_duty_sec,默认30秒) + 4. 确认离岗后 → 离岗倒计时(leave_countdown_sec,默认300秒) + 5. 倒计时结束 → 触发告警(ALARMED状态) + 6. 人员回岗 → 回岗确认期(confirm_return_sec,默认10秒) + 7. 确认回岗 → 发送resolve事件 → 重新上岗确认 + """ + + # 状态定义 + STATE_INIT = "INIT" # 初始化 + STATE_CONFIRMING_ON_DUTY = "CONFIRMING_ON_DUTY" # 上岗确认中 + STATE_ON_DUTY = "ON_DUTY" # 在岗 + STATE_CONFIRMING_OFF_DUTY = "CONFIRMING_OFF_DUTY" # 离岗确认中 + STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN" # 离岗倒计时中 + STATE_ALARMED = "ALARMED" # 已告警(等待回岗) + STATE_NON_WORK_TIME = "NON_WORK_TIME" # 非工作时间 def __init__( self, - confirm_on_duty_sec: int = 10, - confirm_leave_sec: int = 30, - leave_countdown_sec: int = 300, # 新增:离岗倒计时(默认5分钟) - cooldown_sec: int = 600, + confirm_on_duty_sec: int = 10, # 上岗确认窗口(持续检测到人的时长) + confirm_off_duty_sec: int = 30, # 离岗确认窗口(持续未检测到人的时长) + confirm_return_sec: int = 10, # 回岗确认窗口(告警后回来需确认时长) + leave_countdown_sec: int = 300, # 离岗倒计时(确认离岗后等待告警的时长) + cooldown_sec: int = 600, # 告警冷却期(两次告警的最小间隔) working_hours: Optional[List[Dict]] = None, target_class: Optional[str] = "person", + # 兼容旧参数名(向后兼容) + confirm_leave_sec: Optional[int] = None, ): + # 时间参数(处理向后兼容) self.confirm_on_duty_sec = confirm_on_duty_sec - self.confirm_leave_sec = confirm_leave_sec - self.leave_countdown_sec = leave_countdown_sec # 离岗倒计时 + self.confirm_off_duty_sec = confirm_leave_sec if confirm_leave_sec is not None else confirm_off_duty_sec + self.confirm_return_sec = confirm_return_sec + self.leave_countdown_sec = leave_countdown_sec self.cooldown_sec = cooldown_sec + + # 工作时间和目标类别 self.working_hours = working_hours or [] self.target_class = target_class + # 状态变量 + self.state: str = self.STATE_INIT + self.state_start_time: Optional[datetime] = None + + # 滑动窗口(用于平滑检测结果) + self.detection_window: deque = deque() # [(timestamp, has_person), ...] + self.window_size_sec = 10 # 滑动窗口大小:10秒 + + # 告警追踪 + self._last_alarm_id: Optional[str] = None + self._leave_start_time: Optional[datetime] = None # 人员离开时间(用于计算持续时长) + self._alarm_triggered_time: Optional[datetime] = None # 告警触发时间 self.alert_cooldowns: Dict[str, datetime] = {} - self.state: str = self.STATE_WAITING - self.state_start_time: Optional[datetime] = None - self.detection_history: deque = deque() - - self.alarm_sent: bool = False - self.last_person_time: Optional[datetime] = None - - self._last_alarm_id: Optional[str] = None # 最近一次告警ID - self._off_duty_start_time: Optional[datetime] = None # OFF_DUTY状态开始时间(用于倒计时) - self._alarm_triggered: bool = False # 是否已触发告警(OFF_DUTY期间只告警一次) - self._leave_start_time: Optional[datetime] = None # 离岗开始时间(LEAVING 状态的开始时间) - def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: + """检查是否在工作时间""" if not self.working_hours: return True - + import json - + working_hours = self.working_hours if isinstance(working_hours, str): try: working_hours = json.loads(working_hours) except: return True - + if not working_hours: return True - + dt = dt or datetime.now() current_minutes = dt.hour * 60 + dt.minute for period in working_hours: @@ -80,7 +107,7 @@ class LeavePostAlgorithm: if start_minutes <= current_minutes < end_minutes: return True return False - + def _parse_time_to_minutes(self, time_str: str) -> int: """将时间字符串转换为分钟数""" if isinstance(time_str, int): @@ -92,31 +119,43 @@ class LeavePostAlgorithm: return 0 def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: + """检查检测结果是否在ROI内""" matched_rois = detection.get("matched_rois", []) - for roi in matched_rois: - if roi.get("roi_id") == roi_id: - return True - return False + return any(roi.get("roi_id") == roi_id for roi in matched_rois) - def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: + def _check_target_class(self, detection: Dict, target_class: str) -> bool: + """检查是否为目标类别""" if not target_class: return True return detection.get("class") == target_class - def _get_detection_window(self, current_time: datetime) -> List[bool]: - detections = [] - while self.detection_history and (current_time - self.detection_history[0][0]).total_seconds() > max(self.confirm_on_duty_sec, self.confirm_leave_sec): - self.detection_history.popleft() - for _, has_person in self.detection_history: - detections.append(has_person) - return detections - def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: + """获取ROI内最新的检测框""" for det in tracks: - if self._check_detection_in_roi(det, roi_id): + if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): return det.get("bbox", []) return [] + def _update_detection_window(self, current_time: datetime, has_person: bool): + """更新滑动窗口""" + self.detection_window.append((current_time, has_person)) + + # 移除窗口外的旧数据 + while self.detection_window: + oldest_time, _ = self.detection_window[0] + if (current_time - oldest_time).total_seconds() > self.window_size_sec: + self.detection_window.popleft() + else: + break + + def _get_detection_ratio(self) -> float: + """计算滑动窗口内的检测命中率""" + if not self.detection_window: + return 0.0 + + person_count = sum(1 for _, has_person in self.detection_window if has_person) + return person_count / len(self.detection_window) + def process( self, roi_id: str, @@ -124,125 +163,135 @@ class LeavePostAlgorithm: tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: - current_time = current_time or datetime.now() + """ + 处理单帧检测结果 - in_work = self._is_in_working_hours(current_time) + Args: + roi_id: ROI区域ID + camera_id: 摄像头ID + tracks: 检测结果列表 + current_time: 当前时间(用于测试,生产环境传None) + + Returns: + 告警列表 [{"alert_type": "leave_post", ...}, {"alert_type": "alarm_resolve", ...}] + """ + current_time = current_time or datetime.now() alerts = [] - if not in_work: - if self.state == self.STATE_OFF_DUTY and self._last_alarm_id and self._leave_start_time: - duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) - alerts.append({ - "alert_type": "alarm_resolve", - "resolve_alarm_id": self._last_alarm_id, - "duration_ms": duration_ms, - "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), - "resolve_type": "non_work_time", - }) - self._last_alarm_id = None - self._leave_start_time = None - self.state = self.STATE_NON_WORK_TIME - self.detection_history.clear() - self.alarm_sent = False - return alerts + # 检查ROI内是否有目标 + roi_has_person = any( + self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class) + for det in tracks + ) - if self.state == self.STATE_NON_WORK_TIME: - self.state = self.STATE_WAITING - self.state_start_time = None - self.detection_history.clear() - self.alarm_sent = False - self.alert_cooldowns.clear() # 新工作时段,清除冷却记录 + # 更新滑动窗口 + self._update_detection_window(current_time, roi_has_person) + detection_ratio = self._get_detection_ratio() - roi_has_person = False - for det in tracks: - if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): - roi_has_person = True - break + # 检查工作时间 + in_working_hours = self._is_in_working_hours(current_time) - if self.state == self.STATE_WAITING: - if roi_has_person: - # 检测到人,进入上岗确认阶段 - self.state = self.STATE_CONFIRMING - self.state_start_time = current_time - self.detection_history.clear() - self.detection_history.append((current_time, True)) - else: - pass - - elif self.state == self.STATE_CONFIRMING: - self.detection_history.append((current_time, roi_has_person)) - if not roi_has_person: - # 人消失,回到等待状态 - self.state = self.STATE_WAITING - self.state_start_time = None - self.detection_history.clear() - else: - elapsed = (current_time - self.state_start_time).total_seconds() - if elapsed >= self.confirm_on_duty_sec: - # 持续在岗达到确认时长,正式确认上岗 - self.state = self.STATE_ON_DUTY - self.state_start_time = current_time - # 确认在岗后清除冷却记录,允许新一轮离岗检测告警 - self.alert_cooldowns.clear() - - elif self.state == self.STATE_ON_DUTY: - self.detection_history.append((current_time, roi_has_person)) - if not roi_has_person: - self.state = self.STATE_LEAVING - self.state_start_time = current_time - - elif self.state == self.STATE_LEAVING: - self.detection_history.append((current_time, roi_has_person)) - elapsed = (current_time - self.state_start_time).total_seconds() - - if roi_has_person: - # 人回来了,回到ON_DUTY - self.state = self.STATE_ON_DUTY - self.state_start_time = current_time - elif elapsed >= self.confirm_leave_sec: - # 离岗确认期结束 → 进入OFF_DUTY开始倒计时(暂不触发告警) - leaving_start_time = self.state_start_time # 保存人员离开时间 - self.state = self.STATE_OFF_DUTY - self.state_start_time = current_time - self._off_duty_start_time = current_time # 记录OFF_DUTY开始时间用于倒计时 - self._leave_start_time = leaving_start_time # 保存LEAVING开始时间(人员离开时间) - self._alarm_triggered = False # 重置告警标志 - - elif self.state == self.STATE_OFF_DUTY: - # OFF_DUTY 状态:离岗倒计时 + 等待人员回岗 - off_duty_elapsed = (current_time - self._off_duty_start_time).total_seconds() if self._off_duty_start_time else 0 - - if roi_has_person: - # 人员回岗 - if self._last_alarm_id and self._leave_start_time and self._alarm_triggered: - # 如果已经触发过告警,发送resolve事件 + # === 非工作时间处理 === + if not in_working_hours: + if self.state != self.STATE_NON_WORK_TIME: + # 进入非工作时间,清理状态 + if self._last_alarm_id and self._leave_start_time: + # 如果有未结束的告警,发送resolve事件(非工作时间自动关闭) duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) alerts.append({ "alert_type": "alarm_resolve", "resolve_alarm_id": self._last_alarm_id, "duration_ms": duration_ms, "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), - "resolve_type": "person_returned", + "resolve_type": "non_work_time", }) - # 清理告警追踪信息 + + self.state = self.STATE_NON_WORK_TIME + self.state_start_time = None + self.detection_window.clear() self._last_alarm_id = None self._leave_start_time = None - self._off_duty_start_time = None - self._alarm_triggered = False - # 回到确认在岗状态 - self.state = self.STATE_CONFIRMING + self._alarm_triggered_time = None + return alerts + + # === 工作时间处理 === + + # 从非工作时间恢复 + if self.state == self.STATE_NON_WORK_TIME: + self.state = self.STATE_INIT + self.state_start_time = current_time + self.detection_window.clear() + + # === 状态机处理 === + + if self.state == self.STATE_INIT: + # 初始化状态:等待检测到人 + if roi_has_person: + self.state = self.STATE_CONFIRMING_ON_DUTY self.state_start_time = current_time - self.detection_history.clear() - self.detection_history.append((current_time, True)) - elif off_duty_elapsed >= self.leave_countdown_sec and not self._alarm_triggered: - # 离岗倒计时结束,触发告警 + logger.debug(f"ROI {roi_id}: INIT → CONFIRMING_ON_DUTY") + + elif self.state == self.STATE_CONFIRMING_ON_DUTY: + # 上岗确认中:需要持续检测到人 + elapsed = (current_time - self.state_start_time).total_seconds() + + if detection_ratio == 0: + # 人消失了,回到INIT + self.state = self.STATE_INIT + self.state_start_time = current_time + self.detection_window.clear() + logger.debug(f"ROI {roi_id}: CONFIRMING_ON_DUTY → INIT (人消失)") + elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.7: + # 上岗确认成功(命中率>=70%) + self.state = self.STATE_ON_DUTY + self.state_start_time = current_time + self.alert_cooldowns.clear() # 确认在岗后清除冷却记录 + logger.info(f"ROI {roi_id}: CONFIRMING_ON_DUTY → ON_DUTY (上岗确认成功)") + + elif self.state == self.STATE_ON_DUTY: + # 在岗状态:监控是否离岗 + if detection_ratio == 0: + # 滑动窗口内完全没有人,进入离岗确认 + self.state = self.STATE_CONFIRMING_OFF_DUTY + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: ON_DUTY → CONFIRMING_OFF_DUTY") + + elif self.state == self.STATE_CONFIRMING_OFF_DUTY: + # 离岗确认中:需要持续未检测到人 + elapsed = (current_time - self.state_start_time).total_seconds() + + if roi_has_person: + # 人回来了,回到ON_DUTY + self.state = self.STATE_ON_DUTY + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了)") + elif elapsed >= self.confirm_off_duty_sec and detection_ratio == 0: + # 离岗确认成功,进入倒计时 + self.state = self.STATE_OFF_DUTY_COUNTDOWN + self.state_start_time = current_time + self._leave_start_time = self.state_start_time # 记录离开时间 + logger.info(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → OFF_DUTY_COUNTDOWN (离岗确认成功)") + + elif self.state == self.STATE_OFF_DUTY_COUNTDOWN: + # 离岗倒计时中:等待告警触发 + elapsed = (current_time - self.state_start_time).total_seconds() + + if roi_has_person: + # 倒计时期间人回来了,回到ON_DUTY(未触发告警) + self.state = self.STATE_ON_DUTY + self.state_start_time = current_time + self._leave_start_time = None + logger.info(f"ROI {roi_id}: OFF_DUTY_COUNTDOWN → ON_DUTY (倒计时期间回来)") + elif elapsed >= self.leave_countdown_sec: + # 倒计时结束,触发告警 cooldown_key = f"{camera_id}_{roi_id}" - now = datetime.now() - if cooldown_key not in self.alert_cooldowns or (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: + if cooldown_key not in self.alert_cooldowns or \ + (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: + bbox = self._get_latest_bbox(tracks, roi_id) - # 计算总离岗时长(从人离开到现在) - total_leave_duration = (current_time - self._leave_start_time).total_seconds() if self._leave_start_time else 0 - elapsed_minutes = int(total_leave_duration / 60) + total_off_duty_sec = (current_time - self._leave_start_time).total_seconds() + elapsed_minutes = int(total_off_duty_sec / 60) + alerts.append({ "track_id": roi_id, "camera_id": camera_id, @@ -251,9 +300,38 @@ class LeavePostAlgorithm: "alert_type": "leave_post", "message": f"离岗 {elapsed_minutes} 分钟", }) - self.alert_cooldowns[cooldown_key] = now - self._alarm_triggered = True # 标记已触发告警 - # alarm_id 由 main.py 通过 set_last_alarm_id() 回填 + + self.alert_cooldowns[cooldown_key] = current_time + self._alarm_triggered_time = current_time + self.state = self.STATE_ALARMED + # _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填 + logger.warning(f"ROI {roi_id}: OFF_DUTY_COUNTDOWN → ALARMED (告警触发)") + + elif self.state == self.STATE_ALARMED: + # 已告警状态:等待人员回岗 + if roi_has_person: + # 检测到人,进入回岗确认 + self.state = self.STATE_CONFIRMING_ON_DUTY # 复用上岗确认状态 + self.state_start_time = current_time + logger.info(f"ROI {roi_id}: ALARMED → CONFIRMING_ON_DUTY (检测到人回岗)") + + # 特殊处理:从CONFIRMING_ON_DUTY再次确认上岗时,如果有未结束的告警,发送resolve + if self.state == self.STATE_ON_DUTY and self._last_alarm_id: + # 回岗确认成功,发送resolve事件 + duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "resolve_type": "person_returned", + }) + + # 清理告警追踪信息 + self._last_alarm_id = None + self._leave_start_time = None + self._alarm_triggered_time = None + logger.info(f"ROI {roi_id}: 告警已解决(人员回岗)") return alerts @@ -262,24 +340,36 @@ class LeavePostAlgorithm: self._last_alarm_id = alarm_id def reset(self): - self.state = self.STATE_WAITING + """重置算法状态""" + self.state = self.STATE_INIT self.state_start_time = None - self.detection_history.clear() - self.alarm_sent = False - self.last_person_time = None - self.alert_cooldowns.clear() + self.detection_window.clear() self._last_alarm_id = None self._leave_start_time = None - self._off_duty_start_time = None - self._alarm_triggered = False + self._alarm_triggered_time = None + self.alert_cooldowns.clear() def get_state(self, roi_id: str) -> Dict[str, Any]: - return { + """获取当前状态(用于调试和监控)""" + state_info = { "state": self.state, - "alarm_sent": self.alarm_sent, - "last_person_time": self.last_person_time, + "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, + "detection_ratio": self._get_detection_ratio(), + "window_size": len(self.detection_window), } + # 添加状态特定信息 + if self.state == self.STATE_OFF_DUTY_COUNTDOWN and self.state_start_time: + elapsed = (datetime.now() - self.state_start_time).total_seconds() + state_info["countdown_remaining_sec"] = max(0, self.leave_countdown_sec - elapsed) + + if self.state == self.STATE_ALARMED and self._leave_start_time: + total_off_duty_sec = (datetime.now() - self._leave_start_time).total_seconds() + state_info["total_off_duty_sec"] = total_off_duty_sec + state_info["alarm_id"] = self._last_alarm_id + + return state_info + class IntrusionAlgorithm: def __init__(