diff --git a/algorithms.py b/algorithms.py index ae3e2d8..ade3d65 100644 --- a/algorithms.py +++ b/algorithms.py @@ -370,6 +370,25 @@ class LeavePostAlgorithm: class IntrusionAlgorithm: + """ + 周界入侵检测算法(状态机版本 v2.0) + + 状态机: + IDLE → CONFIRMING_INTRUSION → ALARMED → CONFIRMING_CLEAR → IDLE + + 业务流程: + 1. 检测到人 → 入侵确认期(confirm_seconds,默认5秒) + 2. 确认入侵 → 触发告警(ALARMED状态) + 3. 人离开ROI → 入侵消失确认期(confirm_seconds,默认5秒) + 4. 确认消失 → 发送resolve事件 → 回到空闲状态 + """ + + # 状态定义 + STATE_IDLE = "IDLE" # 空闲(无入侵) + STATE_CONFIRMING_INTRUSION = "CONFIRMING_INTRUSION" # 入侵确认中 + STATE_ALARMED = "ALARMED" # 已告警(等待入侵消失) + STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 入侵消失确认中 + def __init__( self, cooldown_seconds: int = 300, @@ -380,6 +399,18 @@ class IntrusionAlgorithm: self.confirm_seconds = confirm_seconds self.target_class = target_class + # 状态变量 + self.state: str = self.STATE_IDLE + self.state_start_time: Optional[datetime] = None + + # 告警追踪 + self._last_alarm_id: Optional[str] = None + self._intrusion_start_time: Optional[datetime] = None + + # 冷却期管理 + self.alert_cooldowns: Dict[str, datetime] = {} + + # 向后兼容:保留旧变量(不再使用) self.last_alert_time: Dict[str, datetime] = {} self.alert_triggered: Dict[str, bool] = {} self.detection_start: Dict[str, Optional[datetime]] = {} @@ -409,52 +440,145 @@ class IntrusionAlgorithm: tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: + """ + 处理单帧检测结果 + + Args: + roi_id: ROI区域ID + camera_id: 摄像头ID + tracks: 检测结果列表 + current_time: 当前时间(用于测试,生产环境传None) + + Returns: + 告警列表 [{"alert_type": "intrusion", ...}, {"alert_type": "alarm_resolve", ...}] + """ current_time = current_time or datetime.now() - key = f"{camera_id}_{roi_id}" + alerts = [] - roi_has_person = False - for det in tracks: - if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): - roi_has_person = True - break + # 检查ROI内是否有目标 + roi_has_person = any( + self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class) + for det in tracks + ) - if not roi_has_person: - self.detection_start.pop(key, None) - self.alert_triggered[key] = False - return [] + # === 状态机处理 === - if self.alert_triggered.get(key, False): - elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds() - if elapsed_since_alert < self.cooldown_seconds: - return [] - self.alert_triggered[key] = False + if self.state == self.STATE_IDLE: + # 空闲状态:等待检测到入侵 + if roi_has_person: + self.state = self.STATE_CONFIRMING_INTRUSION + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_INTRUSION") - if self.detection_start.get(key) is None: - self.detection_start[key] = current_time + elif self.state == self.STATE_CONFIRMING_INTRUSION: + # 入侵确认中:需要持续检测到人 + elapsed = (current_time - self.state_start_time).total_seconds() - elapsed = (current_time - self.detection_start[key]).total_seconds() - if elapsed < self.confirm_seconds: - return [] + if not roi_has_person: + # 人消失了,回到IDLE + self.state = self.STATE_IDLE + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (人消失)") + elif elapsed >= self.confirm_seconds: + # 入侵确认成功,检查冷却期 + cooldown_key = f"{camera_id}_{roi_id}" + if cooldown_key not in self.alert_cooldowns or \ + (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_seconds: - bbox = self._get_latest_bbox(tracks, roi_id) - self.last_alert_time[key] = current_time - self.alert_triggered[key] = True - self.detection_start[key] = None + bbox = self._get_latest_bbox(tracks, roi_id) + self._intrusion_start_time = self.state_start_time # 记录入侵开始时间 - return [{ - "roi_id": roi_id, - "camera_id": camera_id, - "bbox": bbox, - "alert_type": "intrusion", - "alarm_level": 3, - "message": "检测到周界入侵", - }] + alerts.append({ + "roi_id": roi_id, + "camera_id": camera_id, + "bbox": bbox, + "alert_type": "intrusion", + "alarm_level": 3, + "message": "检测到周界入侵", + "first_frame_time": self._intrusion_start_time.strftime('%Y-%m-%d %H:%M:%S'), + }) + + self.alert_cooldowns[cooldown_key] = current_time + self.state = self.STATE_ALARMED + # _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填 + logger.warning(f"ROI {roi_id}: CONFIRMING_INTRUSION → ALARMED (告警触发)") + else: + # 冷却期内,回到IDLE + self.state = self.STATE_IDLE + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (冷却期内)") + + elif self.state == self.STATE_ALARMED: + # 已告警状态:等待入侵消失 + if not roi_has_person: + # 检测到无人,进入消失确认 + self.state = self.STATE_CONFIRMING_CLEAR + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR") + + elif self.state == self.STATE_CONFIRMING_CLEAR: + # 入侵消失确认中:需要持续未检测到人 + elapsed = (current_time - self.state_start_time).total_seconds() + + if roi_has_person: + # 人又出现了,回到ALARMED + self.state = self.STATE_ALARMED + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (人又出现)") + elif elapsed >= self.confirm_seconds: + # 消失确认成功,发送resolve事件 + if self._last_alarm_id and self._intrusion_start_time: + duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "resolve_type": "intrusion_cleared", + }) + + logger.info(f"ROI {roi_id}: 告警已解决(入侵消失)") + + # 重置状态 + self.state = self.STATE_IDLE + self.state_start_time = None + self._last_alarm_id = None + self._intrusion_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)") + + return alerts + + def set_last_alarm_id(self, alarm_id: str): + """由 main.py 在告警生成后回填 alarm_id""" + self._last_alarm_id = alarm_id def reset(self): + """重置算法状态""" + self.state = self.STATE_IDLE + self.state_start_time = None + self._last_alarm_id = None + self._intrusion_start_time = None + self.alert_cooldowns.clear() + # 向后兼容 self.last_alert_time.clear() self.alert_triggered.clear() self.detection_start.clear() + def get_state(self, roi_id: str) -> Dict[str, Any]: + """获取当前状态(用于调试和监控)""" + state_info = { + "state": self.state, + "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, + } + + # 添加状态特定信息 + if self.state == self.STATE_ALARMED and self._intrusion_start_time: + total_intrusion_sec = (datetime.now() - self._intrusion_start_time).total_seconds() + state_info["total_intrusion_sec"] = total_intrusion_sec + state_info["alarm_id"] = self._last_alarm_id + + return state_info + # class CrowdDetectionAlgorithm: # """人群聚集检测算法 - 暂时注释,后续需要时再启用"""