diff --git a/algorithms.py b/algorithms.py index b1f0fa4..589e053 100644 --- a/algorithms.py +++ b/algorithms.py @@ -28,12 +28,14 @@ class LeavePostAlgorithm: self, confirm_on_duty_sec: int = 10, confirm_leave_sec: int = 30, + leave_countdown_sec: int = 300, # 新增:离岗倒计时(默认5分钟) cooldown_sec: int = 600, working_hours: Optional[List[Dict]] = None, target_class: Optional[str] = "person", ): self.confirm_on_duty_sec = confirm_on_duty_sec self.confirm_leave_sec = confirm_leave_sec + self.leave_countdown_sec = leave_countdown_sec # 离岗倒计时 self.cooldown_sec = cooldown_sec self.working_hours = working_hours or [] self.target_class = target_class @@ -48,6 +50,8 @@ class LeavePostAlgorithm: self.last_person_time: Optional[datetime] = None self._last_alarm_id: Optional[str] = None # 最近一次告警ID + self._off_duty_start_time: Optional[datetime] = None # OFF_DUTY状态开始时间(用于倒计时) + self._alarm_triggered: bool = False # 是否已触发告警(OFF_DUTY期间只告警一次) self._leave_start_time: Optional[datetime] = None # 离岗开始时间(LEAVING 状态的开始时间) def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: @@ -192,19 +196,53 @@ class LeavePostAlgorithm: elapsed = (current_time - self.state_start_time).total_seconds() if roi_has_person: + # 人回来了,回到ON_DUTY self.state = self.STATE_ON_DUTY self.state_start_time = current_time elif elapsed >= self.confirm_leave_sec: - # 确认离岗后直接触发告警,不再进入 OFF_DUTY 二次等待 - leaving_start_time = self.state_start_time # 保存 LEAVING 状态开始时间(人员离开时间) + # 离岗确认期结束 → 进入OFF_DUTY开始倒计时(暂不触发告警) + leaving_start_time = self.state_start_time # 保存人员离开时间 self.state = self.STATE_OFF_DUTY self.state_start_time = current_time + self._off_duty_start_time = current_time # 记录OFF_DUTY开始时间用于倒计时 + self._leave_start_time = leaving_start_time # 保存LEAVING开始时间(人员离开时间) + self._alarm_triggered = False # 重置告警标志 + elif self.state == self.STATE_OFF_DUTY: + # OFF_DUTY 状态:离岗倒计时 + 等待人员回岗 + off_duty_elapsed = (current_time - self._off_duty_start_time).total_seconds() if self._off_duty_start_time else 0 + + if roi_has_person: + # 人员回岗 + if self._last_alarm_id and self._leave_start_time and self._alarm_triggered: + # 如果已经触发过告警,发送resolve事件 + duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "resolve_type": "person_returned", + }) + # 清理告警追踪信息 + self._last_alarm_id = None + self._leave_start_time = None + self._off_duty_start_time = None + self._alarm_triggered = False + # 回到确认在岗状态 + self.state = self.STATE_CONFIRMING + self.state_start_time = current_time + self.detection_history.clear() + self.detection_history.append((current_time, True)) + elif off_duty_elapsed >= self.leave_countdown_sec and not self._alarm_triggered: + # 离岗倒计时结束,触发告警 cooldown_key = f"{camera_id}_{roi_id}" now = datetime.now() if cooldown_key not in self.alert_cooldowns or (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: bbox = self._get_latest_bbox(tracks, roi_id) - elapsed_minutes = int(elapsed / 60) + # 计算总离岗时长(从人离开到现在) + total_leave_duration = (current_time - self._leave_start_time).total_seconds() if self._leave_start_time else 0 + elapsed_minutes = int(total_leave_duration / 60) alerts.append({ "track_id": roi_id, "camera_id": camera_id, @@ -214,31 +252,8 @@ class LeavePostAlgorithm: "message": f"离岗 {elapsed_minutes} 分钟", }) self.alert_cooldowns[cooldown_key] = now - - # 保存告警追踪信息(alarm_id 由 main.py 通过 set_last_alarm_id() 回填) - self._last_alarm_id = None - self._leave_start_time = leaving_start_time # LEAVING 状态开始时间 = 人员离开时间 - - elif self.state == self.STATE_OFF_DUTY: - # OFF_DUTY 状态:只等待人员回岗,不再重复告警 - # 必须经过 CONFIRMING 重新确认在岗后,才允许新一轮离岗检测 - if roi_has_person: - # 生成 resolve 事件(人员回岗) - if self._last_alarm_id and self._leave_start_time: - duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) - alerts.append({ - "alert_type": "alarm_resolve", - "resolve_alarm_id": self._last_alarm_id, - "duration_ms": duration_ms, - "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), - "resolve_type": "person_returned", - }) - self._last_alarm_id = None - self._leave_start_time = None - self.state = self.STATE_CONFIRMING - self.state_start_time = current_time - self.detection_history.clear() - self.detection_history.append((current_time, True)) + self._alarm_triggered = True # 标记已触发告警 + # alarm_id 由 main.py 通过 set_last_alarm_id() 回填 return alerts @@ -255,6 +270,8 @@ class LeavePostAlgorithm: self.alert_cooldowns.clear() self._last_alarm_id = None self._leave_start_time = None + self._off_duty_start_time = None + self._alarm_triggered = False def get_state(self, roi_id: str) -> Dict[str, Any]: return { @@ -583,6 +600,7 @@ class AlgorithmManager: self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm( confirm_on_duty_sec=algo_params["confirm_on_duty_sec"], confirm_leave_sec=algo_params["confirm_leave_sec"], + leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时,默认5分钟 cooldown_sec=algo_params["cooldown_sec"], working_hours=algo_params["working_hours"], target_class=algo_params["target_class"], @@ -683,6 +701,7 @@ class AlgorithmManager: self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm( confirm_on_duty_sec=algo_params.get("confirm_on_duty_sec", 10), confirm_leave_sec=algo_params.get("confirm_leave_sec", 30), + leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时,默认5分钟 cooldown_sec=algo_params.get("cooldown_sec", 600), working_hours=roi_working_hours, target_class=algo_params.get("target_class", "person"),