feat(intrusion): 添加自动告警处理功能 - 状态机和resolve事件

- 新增状态机: IDLE → CONFIRMING_INTRUSION → ALARMED → CONFIRMING_CLEAR → IDLE
- 入侵消失后,连续confirm_seconds秒无人,自动发送alarm_resolve
- 告警追踪: _last_alarm_id, _intrusion_start_time
- 冷却期逻辑保留,防止重复告警

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-14 10:14:46 +08:00
parent d6644a65f3
commit 2d90f00b11

View File

@@ -370,6 +370,25 @@ class LeavePostAlgorithm:
class IntrusionAlgorithm:
"""
周界入侵检测算法(状态机版本 v2.0
状态机:
IDLE → CONFIRMING_INTRUSION → ALARMED → CONFIRMING_CLEAR → IDLE
业务流程:
1. 检测到人 → 入侵确认期confirm_seconds默认5秒
2. 确认入侵 → 触发告警ALARMED状态
3. 人离开ROI → 入侵消失确认期confirm_seconds默认5秒
4. 确认消失 → 发送resolve事件 → 回到空闲状态
"""
# 状态定义
STATE_IDLE = "IDLE" # 空闲(无入侵)
STATE_CONFIRMING_INTRUSION = "CONFIRMING_INTRUSION" # 入侵确认中
STATE_ALARMED = "ALARMED" # 已告警(等待入侵消失)
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 入侵消失确认中
def __init__(
self,
cooldown_seconds: int = 300,
@@ -380,6 +399,18 @@ class IntrusionAlgorithm:
self.confirm_seconds = confirm_seconds
self.target_class = target_class
# 状态变量
self.state: str = self.STATE_IDLE
self.state_start_time: Optional[datetime] = None
# 告警追踪
self._last_alarm_id: Optional[str] = None
self._intrusion_start_time: Optional[datetime] = None
# 冷却期管理
self.alert_cooldowns: Dict[str, datetime] = {}
# 向后兼容:保留旧变量(不再使用)
self.last_alert_time: Dict[str, datetime] = {}
self.alert_triggered: Dict[str, bool] = {}
self.detection_start: Dict[str, Optional[datetime]] = {}
@@ -409,52 +440,145 @@ class IntrusionAlgorithm:
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
"""
处理单帧检测结果
Args:
roi_id: ROI区域ID
camera_id: 摄像头ID
tracks: 检测结果列表
current_time: 当前时间用于测试生产环境传None
Returns:
告警列表 [{"alert_type": "intrusion", ...}, {"alert_type": "alarm_resolve", ...}]
"""
current_time = current_time or datetime.now()
key = f"{camera_id}_{roi_id}"
alerts = []
roi_has_person = False
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
roi_has_person = True
break
# 检查ROI内是否有目标
roi_has_person = any(
self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class)
for det in tracks
)
if not roi_has_person:
self.detection_start.pop(key, None)
self.alert_triggered[key] = False
return []
# === 状态机处理 ===
if self.alert_triggered.get(key, False):
elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds()
if elapsed_since_alert < self.cooldown_seconds:
return []
self.alert_triggered[key] = False
if self.state == self.STATE_IDLE:
# 空闲状态:等待检测到入侵
if roi_has_person:
self.state = self.STATE_CONFIRMING_INTRUSION
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_INTRUSION")
if self.detection_start.get(key) is None:
self.detection_start[key] = current_time
elif self.state == self.STATE_CONFIRMING_INTRUSION:
# 入侵确认中:需要持续检测到人
elapsed = (current_time - self.state_start_time).total_seconds()
elapsed = (current_time - self.detection_start[key]).total_seconds()
if elapsed < self.confirm_seconds:
return []
if not roi_has_person:
# 人消失了回到IDLE
self.state = self.STATE_IDLE
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (人消失)")
elif elapsed >= self.confirm_seconds:
# 入侵确认成功,检查冷却期
cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_seconds:
bbox = self._get_latest_bbox(tracks, roi_id)
self.last_alert_time[key] = current_time
self.alert_triggered[key] = True
self.detection_start[key] = None
bbox = self._get_latest_bbox(tracks, roi_id)
self._intrusion_start_time = self.state_start_time # 记录入侵开始时间
return [{
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "intrusion",
"alarm_level": 3,
"message": "检测到周界入侵",
}]
alerts.append({
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "intrusion",
"alarm_level": 3,
"message": "检测到周界入侵",
"first_frame_time": self._intrusion_start_time.strftime('%Y-%m-%d %H:%M:%S'),
})
self.alert_cooldowns[cooldown_key] = current_time
self.state = self.STATE_ALARMED
# _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填
logger.warning(f"ROI {roi_id}: CONFIRMING_INTRUSION → ALARMED (告警触发)")
else:
# 冷却期内回到IDLE
self.state = self.STATE_IDLE
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (冷却期内)")
elif self.state == self.STATE_ALARMED:
# 已告警状态:等待入侵消失
if not roi_has_person:
# 检测到无人,进入消失确认
self.state = self.STATE_CONFIRMING_CLEAR
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR")
elif self.state == self.STATE_CONFIRMING_CLEAR:
# 入侵消失确认中:需要持续未检测到人
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
# 人又出现了回到ALARMED
self.state = self.STATE_ALARMED
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (人又出现)")
elif elapsed >= self.confirm_seconds:
# 消失确认成功发送resolve事件
if self._last_alarm_id and self._intrusion_start_time:
duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "intrusion_cleared",
})
logger.info(f"ROI {roi_id}: 告警已解决(入侵消失)")
# 重置状态
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._intrusion_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)")
return alerts
def set_last_alarm_id(self, alarm_id: str):
"""由 main.py 在告警生成后回填 alarm_id"""
self._last_alarm_id = alarm_id
def reset(self):
"""重置算法状态"""
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._intrusion_start_time = None
self.alert_cooldowns.clear()
# 向后兼容
self.last_alert_time.clear()
self.alert_triggered.clear()
self.detection_start.clear()
def get_state(self, roi_id: str) -> Dict[str, Any]:
"""获取当前状态(用于调试和监控)"""
state_info = {
"state": self.state,
"state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
}
# 添加状态特定信息
if self.state == self.STATE_ALARMED and self._intrusion_start_time:
total_intrusion_sec = (datetime.now() - self._intrusion_start_time).total_seconds()
state_info["total_intrusion_sec"] = total_intrusion_sec
state_info["alarm_id"] = self._last_alarm_id
return state_info
# class CrowdDetectionAlgorithm:
# """人群聚集检测算法 - 暂时注释,后续需要时再启用"""