功能:新增车辆违停和车辆拥堵检测算法

- IllegalParkingAlgorithm: 5状态机(IDLE→CONFIRMING_VEHICLE→PARKED_COUNTDOWN→ALARMED→CONFIRMING_CLEAR)
  禁停区域检测,15秒确认+5分钟倒计时,滑动窗口抗抖动,支持car/truck/bus/motorcycle
- VehicleCongestionAlgorithm: 4状态机(NORMAL→CONFIRMING_CONGESTION→CONGESTED→CONFIRMING_CLEAR)
  车辆计数≥阈值+持续60秒触发,滑动窗口平均值判断
- AlgorithmManager: 新增default_params、register_algorithm、get_status支持两种新算法
- main.py: 泛化alarm_id回填和first_frame_time提取,ext_data新增vehicle_count字段

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-16 16:54:47 +08:00
parent 9a1ac16f19
commit ea992c6daa
2 changed files with 536 additions and 28 deletions

View File

@@ -705,6 +705,494 @@ class IntrusionAlgorithm:
# self.alert_triggered.clear()
class IllegalParkingAlgorithm:
"""
车辆违停检测算法(状态机版本 v1.0
状态机:
IDLE → CONFIRMING_VEHICLE → PARKED_COUNTDOWN → ALARMED → CONFIRMING_CLEAR → IDLE
业务流程:
1. 检测到车辆进入禁停区 → 车辆确认期confirm_vehicle_sec默认15秒ratio≥0.6
2. 确认有车 → 违停倒计时parking_countdown_sec默认300秒/5分钟
3. 倒计时结束仍有车 → 触发告警ALARMED状态
4. 车辆离开 → 消失确认期confirm_clear_sec默认30秒ratio<0.2
5. 确认车辆离开 → 发送resolve事件 → 回到空闲状态
使用滑动窗口10秒抗抖动支持多类车辆检测。
"""
# 状态定义
STATE_IDLE = "IDLE"
STATE_CONFIRMING_VEHICLE = "CONFIRMING_VEHICLE"
STATE_PARKED_COUNTDOWN = "PARKED_COUNTDOWN"
STATE_ALARMED = "ALARMED"
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR"
# 告警级别常量
ALARM_LEVEL_ILLEGAL_PARKING = 2 # 一般级别
# 滑动窗口参数
WINDOW_SIZE_SEC = 10
def __init__(
self,
confirm_vehicle_sec: int = 15,
parking_countdown_sec: int = 300,
confirm_clear_sec: int = 30,
cooldown_sec: int = 600,
target_classes: Optional[List[str]] = None,
):
self.confirm_vehicle_sec = confirm_vehicle_sec
self.parking_countdown_sec = parking_countdown_sec
self.confirm_clear_sec = confirm_clear_sec
self.cooldown_sec = cooldown_sec
self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"]
# 状态变量
self.state: str = self.STATE_IDLE
self.state_start_time: Optional[datetime] = None
# 滑动窗口:存储 (timestamp, has_vehicle: bool)
self._detection_window: deque = deque()
# 告警追踪
self._last_alarm_id: Optional[str] = None
self._parking_start_time: Optional[datetime] = None
# 冷却期管理
self.alert_cooldowns: Dict[str, datetime] = {}
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_classes(self, detection: Dict) -> bool:
"""检查检测目标是否属于车辆类别"""
det_class = detection.get("class", "")
return det_class in self.target_classes
def _update_window(self, current_time: datetime, has_vehicle: bool):
"""更新滑动窗口"""
self._detection_window.append((current_time, has_vehicle))
cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC)
while self._detection_window and self._detection_window[0][0] < cutoff:
self._detection_window.popleft()
def _get_window_ratio(self) -> float:
"""获取滑动窗口内的检测命中率"""
if not self._detection_window:
return 0.0
hits = sum(1 for _, has in self._detection_window if has)
return hits / len(self._detection_window)
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
return det.get("bbox", [])
return []
def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float:
"""获取ROI内车辆的最高置信度"""
max_conf = 0.0
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
max_conf = max(max_conf, det.get("confidence", 0.0))
return max_conf
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
"""处理单帧检测结果"""
current_time = current_time or datetime.now()
alerts = []
# 检查ROI内是否有车辆
roi_has_vehicle = any(
self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det)
for det in tracks
)
# 更新滑动窗口
self._update_window(current_time, roi_has_vehicle)
ratio = self._get_window_ratio()
# === 状态机处理 ===
if self.state == self.STATE_IDLE:
if roi_has_vehicle:
self.state = self.STATE_CONFIRMING_VEHICLE
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_VEHICLE")
elif self.state == self.STATE_CONFIRMING_VEHICLE:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio < 0.3:
# 命中率过低,车辆可能只是路过
self.state = self.STATE_IDLE
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_VEHICLE → IDLE (ratio={ratio:.2f}<0.3)")
elif elapsed >= self.confirm_vehicle_sec and ratio >= 0.6:
# 确认有车辆停留,进入倒计时
self._parking_start_time = self.state_start_time
self.state = self.STATE_PARKED_COUNTDOWN
self.state_start_time = current_time
logger.info(f"ROI {roi_id}: CONFIRMING_VEHICLE → PARKED_COUNTDOWN (ratio={ratio:.2f})")
elif self.state == self.STATE_PARKED_COUNTDOWN:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio < 0.2:
# 车辆已离开
self.state = self.STATE_IDLE
self.state_start_time = None
self._parking_start_time = None
logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (车辆离开, ratio={ratio:.2f})")
elif elapsed >= self.parking_countdown_sec:
# 倒计时结束,检查冷却期
cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
bbox = self._get_latest_bbox(tracks, roi_id)
confidence = self._get_max_confidence(tracks, roi_id)
alerts.append({
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "illegal_parking",
"alarm_level": self.ALARM_LEVEL_ILLEGAL_PARKING,
"confidence": confidence,
"message": f"检测到车辆违停(已停留{int(elapsed / 60)}分钟)",
"first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None,
"duration_minutes": elapsed / 60,
})
self.alert_cooldowns[cooldown_key] = current_time
self.state = self.STATE_ALARMED
logger.warning(f"ROI {roi_id}: PARKED_COUNTDOWN → ALARMED (违停告警触发)")
else:
self.state = self.STATE_IDLE
self.state_start_time = None
self._parking_start_time = None
logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (冷却期内)")
elif self.state == self.STATE_ALARMED:
if ratio < 0.3:
self.state = self.STATE_CONFIRMING_CLEAR
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR")
elif self.state == self.STATE_CONFIRMING_CLEAR:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio >= 0.5:
# 车辆又出现回到ALARMED
self.state = self.STATE_ALARMED
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (车辆仍在)")
elif elapsed >= self.confirm_clear_sec and ratio < 0.2:
# 确认车辆已离开
if self._last_alarm_id and self._parking_start_time:
duration_ms = int((current_time - self._parking_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "vehicle_left",
})
logger.info(f"ROI {roi_id}: 违停告警已解决(车辆离开)")
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._parking_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE")
return alerts
def set_last_alarm_id(self, alarm_id: str):
"""由 main.py 在告警生成后回填 alarm_id"""
self._last_alarm_id = alarm_id
def reset(self):
"""重置算法状态"""
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._parking_start_time = None
self._detection_window.clear()
self.alert_cooldowns.clear()
def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]:
"""获取当前状态"""
current_time = current_time or datetime.now()
state_info = {
"state": self.state,
"state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
"window_ratio": self._get_window_ratio(),
}
if self.state in (self.STATE_ALARMED, self.STATE_PARKED_COUNTDOWN) and self._parking_start_time:
state_info["parking_duration_sec"] = (current_time - self._parking_start_time).total_seconds()
state_info["alarm_id"] = self._last_alarm_id
return state_info
class VehicleCongestionAlgorithm:
"""
车辆拥堵检测算法(状态机版本 v1.0
状态机:
NORMAL → CONFIRMING_CONGESTION → CONGESTED → CONFIRMING_CLEAR → NORMAL
业务流程:
1. 检测到车辆数量 ≥ count_threshold → 拥堵确认期confirm_congestion_sec默认60秒
2. 确认拥堵(窗口内平均车辆数 ≥ threshold→ 触发告警
3. 车辆减少 → 消散确认期confirm_clear_sec默认120秒
4. 确认消散(平均数 < threshold→ 发送resolve事件 → 回到正常
使用滑动窗口10秒存储车辆计数取平均值判断。
"""
# 状态定义
STATE_NORMAL = "NORMAL"
STATE_CONFIRMING_CONGESTION = "CONFIRMING_CONGESTION"
STATE_CONGESTED = "CONGESTED"
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR"
# 告警级别
ALARM_LEVEL_CONGESTION = 2 # 一般级别
# 滑动窗口参数
WINDOW_SIZE_SEC = 10
def __init__(
self,
count_threshold: int = 3,
confirm_congestion_sec: int = 60,
confirm_clear_sec: int = 120,
cooldown_sec: int = 600,
target_classes: Optional[List[str]] = None,
):
self.count_threshold = count_threshold
self.confirm_congestion_sec = confirm_congestion_sec
self.confirm_clear_sec = confirm_clear_sec
self.cooldown_sec = cooldown_sec
self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"]
# 状态变量
self.state: str = self.STATE_NORMAL
self.state_start_time: Optional[datetime] = None
# 滑动窗口:存储 (timestamp, vehicle_count: int)
self._count_window: deque = deque()
# 告警追踪
self._last_alarm_id: Optional[str] = None
self._congestion_start_time: Optional[datetime] = None
# 冷却期管理
self.alert_cooldowns: Dict[str, datetime] = {}
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_classes(self, detection: Dict) -> bool:
det_class = detection.get("class", "")
return det_class in self.target_classes
def _count_vehicles_in_roi(self, tracks: List[Dict], roi_id: str) -> int:
"""统计ROI内的车辆数量"""
return sum(
1 for det in tracks
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det)
)
def _update_count_window(self, current_time: datetime, count: int):
"""更新车辆计数滑动窗口"""
self._count_window.append((current_time, count))
cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC)
while self._count_window and self._count_window[0][0] < cutoff:
self._count_window.popleft()
def _get_avg_count(self) -> float:
"""获取滑动窗口内的平均车辆数"""
if not self._count_window:
return 0.0
total = sum(c for _, c in self._count_window)
return total / len(self._count_window)
def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float:
max_conf = 0.0
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
max_conf = max(max_conf, det.get("confidence", 0.0))
return max_conf
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
return det.get("bbox", [])
return []
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
"""处理单帧检测结果"""
current_time = current_time or datetime.now()
alerts = []
# 统计ROI内车辆数
vehicle_count = self._count_vehicles_in_roi(tracks, roi_id)
self._update_count_window(current_time, vehicle_count)
avg_count = self._get_avg_count()
# === 状态机处理 ===
if self.state == self.STATE_NORMAL:
if avg_count >= self.count_threshold:
self.state = self.STATE_CONFIRMING_CONGESTION
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: NORMAL → CONFIRMING_CONGESTION (avg={avg_count:.1f}{self.count_threshold})")
elif self.state == self.STATE_CONFIRMING_CONGESTION:
if self.state_start_time is None:
self.state = self.STATE_NORMAL
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if avg_count < self.count_threshold:
# 车辆减少,回到正常
self.state = self.STATE_NORMAL
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (avg={avg_count:.1f}<{self.count_threshold})")
elif elapsed >= self.confirm_congestion_sec:
# 确认拥堵,检查冷却期
cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
self._congestion_start_time = self.state_start_time
bbox = self._get_latest_bbox(tracks, roi_id)
confidence = self._get_max_confidence(tracks, roi_id)
alerts.append({
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "vehicle_congestion",
"alarm_level": self.ALARM_LEVEL_CONGESTION,
"confidence": confidence,
"message": f"检测到车辆拥堵(平均{avg_count:.0f}辆,持续{int(elapsed)}秒)",
"first_frame_time": self._congestion_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._congestion_start_time else None,
"vehicle_count": int(avg_count),
})
self.alert_cooldowns[cooldown_key] = current_time
self.state = self.STATE_CONGESTED
logger.warning(f"ROI {roi_id}: CONFIRMING_CONGESTION → CONGESTED (拥堵告警触发, avg={avg_count:.1f})")
else:
self.state = self.STATE_NORMAL
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (冷却期内)")
elif self.state == self.STATE_CONGESTED:
if avg_count < self.count_threshold:
self.state = self.STATE_CONFIRMING_CLEAR
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{self.count_threshold})")
elif self.state == self.STATE_CONFIRMING_CLEAR:
if self.state_start_time is None:
self.state = self.STATE_NORMAL
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if avg_count >= self.count_threshold:
# 又拥堵了回到CONGESTED
self.state = self.STATE_CONGESTED
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → CONGESTED (avg={avg_count:.1f}{self.count_threshold})")
elif elapsed >= self.confirm_clear_sec:
# 确认消散
if self._last_alarm_id and self._congestion_start_time:
duration_ms = int((current_time - self._congestion_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "congestion_cleared",
})
logger.info(f"ROI {roi_id}: 拥堵告警已解决(车辆减少)")
self.state = self.STATE_NORMAL
self.state_start_time = None
self._last_alarm_id = None
self._congestion_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → NORMAL")
return alerts
def set_last_alarm_id(self, alarm_id: str):
"""由 main.py 在告警生成后回填 alarm_id"""
self._last_alarm_id = alarm_id
def reset(self):
"""重置算法状态"""
self.state = self.STATE_NORMAL
self.state_start_time = None
self._last_alarm_id = None
self._congestion_start_time = None
self._count_window.clear()
self.alert_cooldowns.clear()
def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]:
"""获取当前状态"""
current_time = current_time or datetime.now()
state_info = {
"state": self.state,
"state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
"avg_vehicle_count": self._get_avg_count(),
}
if self.state in (self.STATE_CONGESTED, self.STATE_CONFIRMING_CLEAR) and self._congestion_start_time:
state_info["congestion_duration_sec"] = (current_time - self._congestion_start_time).total_seconds()
state_info["alarm_id"] = self._last_alarm_id
return state_info
class AlgorithmManager:
def __init__(self, working_hours: Optional[List[Dict]] = None):
self.algorithms: Dict[str, Dict[str, Any]] = {}
@@ -724,11 +1212,20 @@ class AlgorithmManager:
"confirm_seconds": 5,
"target_class": None,
},
# "crowd_detection": {
# "max_count": 10,
# "cooldown_seconds": 300,
# "target_class": "person",
# },
"illegal_parking": {
"confirm_vehicle_sec": 15,
"parking_countdown_sec": 300,
"confirm_clear_sec": 30,
"cooldown_sec": 600,
"target_classes": ["car", "truck", "bus", "motorcycle"],
},
"vehicle_congestion": {
"count_threshold": 3,
"confirm_congestion_sec": 60,
"confirm_clear_sec": 120,
"cooldown_sec": 600,
"target_classes": ["car", "truck", "bus", "motorcycle"],
},
}
self._pubsub = None
@@ -1090,13 +1587,22 @@ class AlgorithmManager:
confirm_seconds=algo_params.get("confirm_seconds", 5),
target_class=algo_params.get("target_class"),
)
# elif algorithm_type == "crowd_detection":
# from algorithms import CrowdDetectionAlgorithm
# self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm(
# max_count=algo_params.get("max_count", 10),
# cooldown_seconds=algo_params.get("cooldown_seconds", 300),
# target_class=algo_params.get("target_class", "person"),
# )
elif algorithm_type == "illegal_parking":
self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm(
confirm_vehicle_sec=algo_params.get("confirm_vehicle_sec", 15),
parking_countdown_sec=algo_params.get("parking_countdown_sec", 300),
confirm_clear_sec=algo_params.get("confirm_clear_sec", 30),
cooldown_sec=algo_params.get("cooldown_sec", 600),
target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]),
)
elif algorithm_type == "vehicle_congestion":
self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm(
count_threshold=algo_params.get("count_threshold", 3),
confirm_congestion_sec=algo_params.get("confirm_congestion_sec", 60),
confirm_clear_sec=algo_params.get("confirm_clear_sec", 120),
cooldown_sec=algo_params.get("cooldown_sec", 600),
target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]),
)
self._registered_keys.add(cache_key)
@@ -1189,6 +1695,8 @@ class AlgorithmManager:
"state": getattr(algo, "state", "WAITING"),
"alarm_sent": getattr(algo, "alarm_sent", False),
}
elif algo_type in ("illegal_parking", "vehicle_congestion"):
status[f"{algo_type}_{bind_id}"] = algo.get_state()
else:
status[f"{algo_type}_{bind_id}"] = {
"detection_count": len(getattr(algo, "detection_start", {})),

32
main.py
View File

@@ -763,12 +763,16 @@ class EdgeInferenceService:
self._camera_alert_cooldown[dedup_key] = now
self._performance_stats["total_alerts_generated"] += 1
# 获取算法的离岗开始时间
leave_start_time = None
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, '_leave_start_time') and algo._leave_start_time:
leave_start_time = algo._leave_start_time.isoformat()
# 获取算法的事件开始时间(泛化:支持所有算法类型)
first_frame_time = None
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type)
if algo:
# 各算法使用不同的内部变量名存储开始时间
for attr in ('_leave_start_time', '_parking_start_time', '_congestion_start_time', '_intrusion_start_time'):
val = getattr(algo, attr, None)
if val:
first_frame_time = val.isoformat()
break
from core.result_reporter import AlarmInfo, generate_alarm_id
alarm_info = AlarmInfo(
@@ -788,20 +792,16 @@ class EdgeInferenceService:
"bind_id": bind.bind_id,
"message": alert.get("message", ""),
"edge_node_id": self._settings.mqtt.device_id,
"first_frame_time": leave_start_time,
"first_frame_time": first_frame_time,
"vehicle_count": alert.get("vehicle_count"),
},
)
self._reporter.report_alarm(alarm_info, screenshot=frame.image)
# 回填 alarm_id 到算法实例(用于后续 resolve 追踪)
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
elif alert_type == "intrusion":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("intrusion")
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
# 回填 alarm_id 到算法实例(用于后续 resolve 追踪,泛化支持所有算法类型
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type)
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
self._logger.info(
f"告警已生成: type={alert_type}, "