feat(intrusion): 拆分参数,消失确认3分钟,持续有人5秒才重置

- 拆分confirm_seconds为confirm_intrusion_seconds(5秒)和confirm_clear_seconds(180秒)
- 入侵确认:持续检测到人5秒 → 触发告警
- 消失确认:持续无人180秒 → 自动resolve
- 消失确认期间逻辑:
  - 短暂有人(<5秒):继续倒计时
  - 持续有人(≥5秒):回到ALARMED状态
- 向后兼容:保留confirm_seconds参数
This commit is contained in:
2026-02-25 09:12:56 +08:00
parent 7b3265fe74
commit 75ea66c452

View File

@@ -371,16 +371,19 @@ class LeavePostAlgorithm:
class IntrusionAlgorithm: class IntrusionAlgorithm:
""" """
周界入侵检测算法(状态机版本 v2.0 周界入侵检测算法(状态机版本 v3.0
状态机: 状态机:
IDLE → CONFIRMING_INTRUSION → ALARMED → CONFIRMING_CLEAR → IDLE IDLE → CONFIRMING_INTRUSION → ALARMED → CONFIRMING_CLEAR → IDLE
业务流程: 业务流程:
1. 检测到人 → 入侵确认期confirm_seconds默认5秒 1. 检测到人 → 入侵确认期confirm_intrusion_seconds默认5秒
2. 确认入侵 → 触发告警ALARMED状态 2. 确认入侵 → 触发告警ALARMED状态
3. 人离开ROI → 入侵消失确认期confirm_seconds默认5秒) 3. 人离开ROI → 入侵消失确认期confirm_clear_seconds默认180秒)
4. 确认消失 → 发送resolve事件 → 回到空闲状态 4. 消失确认期间:
- 短暂有人(<5秒继续倒计时
- 持续有人≥5秒回到ALARMED状态
5. 确认消失持续无人180秒→ 发送resolve事件 → 回到空闲状态
""" """
# 状态定义 # 状态定义
@@ -395,11 +398,20 @@ class IntrusionAlgorithm:
def __init__( def __init__(
self, self,
cooldown_seconds: int = 300, cooldown_seconds: int = 300,
confirm_seconds: int = 5, confirm_seconds: int = 5, # 向后兼容:同时设置入侵和消失确认时间
confirm_intrusion_seconds: Optional[int] = None, # 入侵确认时间默认5秒
confirm_clear_seconds: Optional[int] = None, # 消失确认时间默认180秒
target_class: Optional[str] = None, target_class: Optional[str] = None,
): ):
self.cooldown_seconds = cooldown_seconds self.cooldown_seconds = cooldown_seconds
# 参数兼容处理
self.confirm_intrusion_seconds = confirm_intrusion_seconds if confirm_intrusion_seconds is not None else confirm_seconds
self.confirm_clear_seconds = confirm_clear_seconds if confirm_clear_seconds is not None else 180
# 向后兼容
self.confirm_seconds = confirm_seconds self.confirm_seconds = confirm_seconds
self.target_class = target_class self.target_class = target_class
# 状态变量 # 状态变量
@@ -410,6 +422,9 @@ class IntrusionAlgorithm:
self._last_alarm_id: Optional[str] = None self._last_alarm_id: Optional[str] = None
self._intrusion_start_time: Optional[datetime] = None self._intrusion_start_time: Optional[datetime] = None
# CONFIRMING_CLEAR状态下检测到人的时间用于判断是否持续5秒
self._person_detected_in_clear_time: Optional[datetime] = None
# 冷却期管理 # 冷却期管理
self.alert_cooldowns: Dict[str, datetime] = {} self.alert_cooldowns: Dict[str, datetime] = {}
@@ -487,7 +502,7 @@ class IntrusionAlgorithm:
self.state = self.STATE_IDLE self.state = self.STATE_IDLE
self.state_start_time = None self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (人消失)") logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (人消失)")
elif elapsed >= self.confirm_seconds: elif elapsed >= self.confirm_intrusion_seconds:
# 入侵确认成功,检查冷却期 # 入侵确认成功,检查冷却期
cooldown_key = f"{camera_id}_{roi_id}" cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \ if cooldown_key not in self.alert_cooldowns or \
@@ -534,30 +549,46 @@ class IntrusionAlgorithm:
elapsed = (current_time - self.state_start_time).total_seconds() elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person: if roi_has_person:
# 人又出现了回到ALARMED # 检测到有
self.state = self.STATE_ALARMED if self._person_detected_in_clear_time is None:
self.state_start_time = current_time # 第一次检测到人,记录时间
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (人又出现)") self._person_detected_in_clear_time = current_time
elif elapsed >= self.confirm_seconds: logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR 检测到人,开始确认(需持续{self.confirm_intrusion_seconds}秒)")
# 消失确认成功发送resolve事件 else:
if self._last_alarm_id and self._intrusion_start_time: # 持续有人,检查是否达到确认时间
duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000) person_elapsed = (current_time - self._person_detected_in_clear_time).total_seconds()
alerts.append({ if person_elapsed >= self.confirm_intrusion_seconds:
"alert_type": "alarm_resolve", # 确认有人重新入侵回到ALARMED
"resolve_alarm_id": self._last_alarm_id, self.state = self.STATE_ALARMED
"duration_ms": duration_ms, self.state_start_time = None
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), self._person_detected_in_clear_time = None
"resolve_type": "intrusion_cleared", logger.info(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (确认有人重新入侵,持续{person_elapsed:.1f}秒)")
}) else:
# 没有人
self._person_detected_in_clear_time = None # 清除临时计时
logger.info(f"ROI {roi_id}: 告警已解决(入侵消失)") # 检查是否达到消失确认时间
if elapsed >= self.confirm_clear_seconds:
# 消失确认成功发送resolve事件
if self._last_alarm_id and self._intrusion_start_time:
duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "intrusion_cleared",
})
# 重置状态 logger.info(f"ROI {roi_id}: 告警已解决(入侵消失,持续无人{elapsed:.1f}秒)")
self.state = self.STATE_IDLE
self.state_start_time = None # 重置状态
self._last_alarm_id = None self.state = self.STATE_IDLE
self._intrusion_start_time = None self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)") self._last_alarm_id = None
self._intrusion_start_time = None
self._person_detected_in_clear_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)")
return alerts return alerts
@@ -571,6 +602,7 @@ class IntrusionAlgorithm:
self.state_start_time = None self.state_start_time = None
self._last_alarm_id = None self._last_alarm_id = None
self._intrusion_start_time = None self._intrusion_start_time = None
self._person_detected_in_clear_time = None
self.alert_cooldowns.clear() self.alert_cooldowns.clear()
# 向后兼容 # 向后兼容
self.last_alert_time.clear() self.last_alert_time.clear()