diff --git a/algorithms.py b/algorithms.py index fc25168..5bb10e7 100644 --- a/algorithms.py +++ b/algorithms.py @@ -705,6 +705,494 @@ class IntrusionAlgorithm: # self.alert_triggered.clear() +class IllegalParkingAlgorithm: + """ + 车辆违停检测算法(状态机版本 v1.0) + + 状态机: + IDLE → CONFIRMING_VEHICLE → PARKED_COUNTDOWN → ALARMED → CONFIRMING_CLEAR → IDLE + + 业务流程: + 1. 检测到车辆进入禁停区 → 车辆确认期(confirm_vehicle_sec,默认15秒,ratio≥0.6) + 2. 确认有车 → 违停倒计时(parking_countdown_sec,默认300秒/5分钟) + 3. 倒计时结束仍有车 → 触发告警(ALARMED状态) + 4. 车辆离开 → 消失确认期(confirm_clear_sec,默认30秒,ratio<0.2) + 5. 确认车辆离开 → 发送resolve事件 → 回到空闲状态 + + 使用滑动窗口(10秒)抗抖动,支持多类车辆检测。 + """ + + # 状态定义 + STATE_IDLE = "IDLE" + STATE_CONFIRMING_VEHICLE = "CONFIRMING_VEHICLE" + STATE_PARKED_COUNTDOWN = "PARKED_COUNTDOWN" + STATE_ALARMED = "ALARMED" + STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" + + # 告警级别常量 + ALARM_LEVEL_ILLEGAL_PARKING = 2 # 一般级别 + + # 滑动窗口参数 + WINDOW_SIZE_SEC = 10 + + def __init__( + self, + confirm_vehicle_sec: int = 15, + parking_countdown_sec: int = 300, + confirm_clear_sec: int = 30, + cooldown_sec: int = 600, + target_classes: Optional[List[str]] = None, + ): + self.confirm_vehicle_sec = confirm_vehicle_sec + self.parking_countdown_sec = parking_countdown_sec + self.confirm_clear_sec = confirm_clear_sec + self.cooldown_sec = cooldown_sec + self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] + + # 状态变量 + self.state: str = self.STATE_IDLE + self.state_start_time: Optional[datetime] = None + + # 滑动窗口:存储 (timestamp, has_vehicle: bool) + self._detection_window: deque = deque() + + # 告警追踪 + self._last_alarm_id: Optional[str] = None + self._parking_start_time: Optional[datetime] = None + + # 冷却期管理 + self.alert_cooldowns: Dict[str, datetime] = {} + + def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: + matched_rois = detection.get("matched_rois", []) + for roi in matched_rois: + if roi.get("roi_id") == roi_id: + return True + return False + + def _check_target_classes(self, detection: Dict) -> bool: + """检查检测目标是否属于车辆类别""" + det_class = detection.get("class", "") + return det_class in self.target_classes + + def _update_window(self, current_time: datetime, has_vehicle: bool): + """更新滑动窗口""" + self._detection_window.append((current_time, has_vehicle)) + cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC) + while self._detection_window and self._detection_window[0][0] < cutoff: + self._detection_window.popleft() + + def _get_window_ratio(self) -> float: + """获取滑动窗口内的检测命中率""" + if not self._detection_window: + return 0.0 + hits = sum(1 for _, has in self._detection_window if has) + return hits / len(self._detection_window) + + def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + return det.get("bbox", []) + return [] + + def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float: + """获取ROI内车辆的最高置信度""" + max_conf = 0.0 + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + max_conf = max(max_conf, det.get("confidence", 0.0)) + return max_conf + + def process( + self, + roi_id: str, + camera_id: str, + tracks: List[Dict], + current_time: Optional[datetime] = None, + ) -> List[Dict]: + """处理单帧检测结果""" + current_time = current_time or datetime.now() + alerts = [] + + # 检查ROI内是否有车辆 + roi_has_vehicle = any( + self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det) + for det in tracks + ) + + # 更新滑动窗口 + self._update_window(current_time, roi_has_vehicle) + ratio = self._get_window_ratio() + + # === 状态机处理 === + + if self.state == self.STATE_IDLE: + if roi_has_vehicle: + self.state = self.STATE_CONFIRMING_VEHICLE + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_VEHICLE") + + elif self.state == self.STATE_CONFIRMING_VEHICLE: + if self.state_start_time is None: + self.state = self.STATE_IDLE + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if ratio < 0.3: + # 命中率过低,车辆可能只是路过 + self.state = self.STATE_IDLE + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_VEHICLE → IDLE (ratio={ratio:.2f}<0.3)") + elif elapsed >= self.confirm_vehicle_sec and ratio >= 0.6: + # 确认有车辆停留,进入倒计时 + self._parking_start_time = self.state_start_time + self.state = self.STATE_PARKED_COUNTDOWN + self.state_start_time = current_time + logger.info(f"ROI {roi_id}: CONFIRMING_VEHICLE → PARKED_COUNTDOWN (ratio={ratio:.2f})") + + elif self.state == self.STATE_PARKED_COUNTDOWN: + if self.state_start_time is None: + self.state = self.STATE_IDLE + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if ratio < 0.2: + # 车辆已离开 + self.state = self.STATE_IDLE + self.state_start_time = None + self._parking_start_time = None + logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (车辆离开, ratio={ratio:.2f})") + elif elapsed >= self.parking_countdown_sec: + # 倒计时结束,检查冷却期 + cooldown_key = f"{camera_id}_{roi_id}" + if cooldown_key not in self.alert_cooldowns or \ + (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: + + bbox = self._get_latest_bbox(tracks, roi_id) + confidence = self._get_max_confidence(tracks, roi_id) + + alerts.append({ + "roi_id": roi_id, + "camera_id": camera_id, + "bbox": bbox, + "alert_type": "illegal_parking", + "alarm_level": self.ALARM_LEVEL_ILLEGAL_PARKING, + "confidence": confidence, + "message": f"检测到车辆违停(已停留{int(elapsed / 60)}分钟)", + "first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None, + "duration_minutes": elapsed / 60, + }) + + self.alert_cooldowns[cooldown_key] = current_time + self.state = self.STATE_ALARMED + logger.warning(f"ROI {roi_id}: PARKED_COUNTDOWN → ALARMED (违停告警触发)") + else: + self.state = self.STATE_IDLE + self.state_start_time = None + self._parking_start_time = None + logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (冷却期内)") + + elif self.state == self.STATE_ALARMED: + if ratio < 0.3: + self.state = self.STATE_CONFIRMING_CLEAR + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR") + + elif self.state == self.STATE_CONFIRMING_CLEAR: + if self.state_start_time is None: + self.state = self.STATE_IDLE + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if ratio >= 0.5: + # 车辆又出现,回到ALARMED + self.state = self.STATE_ALARMED + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (车辆仍在)") + elif elapsed >= self.confirm_clear_sec and ratio < 0.2: + # 确认车辆已离开 + if self._last_alarm_id and self._parking_start_time: + duration_ms = int((current_time - self._parking_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "resolve_type": "vehicle_left", + }) + logger.info(f"ROI {roi_id}: 违停告警已解决(车辆离开)") + + self.state = self.STATE_IDLE + self.state_start_time = None + self._last_alarm_id = None + self._parking_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE") + + return alerts + + def set_last_alarm_id(self, alarm_id: str): + """由 main.py 在告警生成后回填 alarm_id""" + self._last_alarm_id = alarm_id + + def reset(self): + """重置算法状态""" + self.state = self.STATE_IDLE + self.state_start_time = None + self._last_alarm_id = None + self._parking_start_time = None + self._detection_window.clear() + self.alert_cooldowns.clear() + + def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: + """获取当前状态""" + current_time = current_time or datetime.now() + state_info = { + "state": self.state, + "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, + "window_ratio": self._get_window_ratio(), + } + if self.state in (self.STATE_ALARMED, self.STATE_PARKED_COUNTDOWN) and self._parking_start_time: + state_info["parking_duration_sec"] = (current_time - self._parking_start_time).total_seconds() + state_info["alarm_id"] = self._last_alarm_id + return state_info + + +class VehicleCongestionAlgorithm: + """ + 车辆拥堵检测算法(状态机版本 v1.0) + + 状态机: + NORMAL → CONFIRMING_CONGESTION → CONGESTED → CONFIRMING_CLEAR → NORMAL + + 业务流程: + 1. 检测到车辆数量 ≥ count_threshold → 拥堵确认期(confirm_congestion_sec,默认60秒) + 2. 确认拥堵(窗口内平均车辆数 ≥ threshold)→ 触发告警 + 3. 车辆减少 → 消散确认期(confirm_clear_sec,默认120秒) + 4. 确认消散(平均数 < threshold)→ 发送resolve事件 → 回到正常 + + 使用滑动窗口(10秒)存储车辆计数,取平均值判断。 + """ + + # 状态定义 + STATE_NORMAL = "NORMAL" + STATE_CONFIRMING_CONGESTION = "CONFIRMING_CONGESTION" + STATE_CONGESTED = "CONGESTED" + STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" + + # 告警级别 + ALARM_LEVEL_CONGESTION = 2 # 一般级别 + + # 滑动窗口参数 + WINDOW_SIZE_SEC = 10 + + def __init__( + self, + count_threshold: int = 3, + confirm_congestion_sec: int = 60, + confirm_clear_sec: int = 120, + cooldown_sec: int = 600, + target_classes: Optional[List[str]] = None, + ): + self.count_threshold = count_threshold + self.confirm_congestion_sec = confirm_congestion_sec + self.confirm_clear_sec = confirm_clear_sec + self.cooldown_sec = cooldown_sec + self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] + + # 状态变量 + self.state: str = self.STATE_NORMAL + self.state_start_time: Optional[datetime] = None + + # 滑动窗口:存储 (timestamp, vehicle_count: int) + self._count_window: deque = deque() + + # 告警追踪 + self._last_alarm_id: Optional[str] = None + self._congestion_start_time: Optional[datetime] = None + + # 冷却期管理 + self.alert_cooldowns: Dict[str, datetime] = {} + + def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: + matched_rois = detection.get("matched_rois", []) + for roi in matched_rois: + if roi.get("roi_id") == roi_id: + return True + return False + + def _check_target_classes(self, detection: Dict) -> bool: + det_class = detection.get("class", "") + return det_class in self.target_classes + + def _count_vehicles_in_roi(self, tracks: List[Dict], roi_id: str) -> int: + """统计ROI内的车辆数量""" + return sum( + 1 for det in tracks + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det) + ) + + def _update_count_window(self, current_time: datetime, count: int): + """更新车辆计数滑动窗口""" + self._count_window.append((current_time, count)) + cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC) + while self._count_window and self._count_window[0][0] < cutoff: + self._count_window.popleft() + + def _get_avg_count(self) -> float: + """获取滑动窗口内的平均车辆数""" + if not self._count_window: + return 0.0 + total = sum(c for _, c in self._count_window) + return total / len(self._count_window) + + def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float: + max_conf = 0.0 + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + max_conf = max(max_conf, det.get("confidence", 0.0)) + return max_conf + + def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + return det.get("bbox", []) + return [] + + def process( + self, + roi_id: str, + camera_id: str, + tracks: List[Dict], + current_time: Optional[datetime] = None, + ) -> List[Dict]: + """处理单帧检测结果""" + current_time = current_time or datetime.now() + alerts = [] + + # 统计ROI内车辆数 + vehicle_count = self._count_vehicles_in_roi(tracks, roi_id) + self._update_count_window(current_time, vehicle_count) + avg_count = self._get_avg_count() + + # === 状态机处理 === + + if self.state == self.STATE_NORMAL: + if avg_count >= self.count_threshold: + self.state = self.STATE_CONFIRMING_CONGESTION + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: NORMAL → CONFIRMING_CONGESTION (avg={avg_count:.1f}≥{self.count_threshold})") + + elif self.state == self.STATE_CONFIRMING_CONGESTION: + if self.state_start_time is None: + self.state = self.STATE_NORMAL + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if avg_count < self.count_threshold: + # 车辆减少,回到正常 + self.state = self.STATE_NORMAL + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (avg={avg_count:.1f}<{self.count_threshold})") + elif elapsed >= self.confirm_congestion_sec: + # 确认拥堵,检查冷却期 + cooldown_key = f"{camera_id}_{roi_id}" + if cooldown_key not in self.alert_cooldowns or \ + (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: + + self._congestion_start_time = self.state_start_time + bbox = self._get_latest_bbox(tracks, roi_id) + confidence = self._get_max_confidence(tracks, roi_id) + + alerts.append({ + "roi_id": roi_id, + "camera_id": camera_id, + "bbox": bbox, + "alert_type": "vehicle_congestion", + "alarm_level": self.ALARM_LEVEL_CONGESTION, + "confidence": confidence, + "message": f"检测到车辆拥堵(平均{avg_count:.0f}辆,持续{int(elapsed)}秒)", + "first_frame_time": self._congestion_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._congestion_start_time else None, + "vehicle_count": int(avg_count), + }) + + self.alert_cooldowns[cooldown_key] = current_time + self.state = self.STATE_CONGESTED + logger.warning(f"ROI {roi_id}: CONFIRMING_CONGESTION → CONGESTED (拥堵告警触发, avg={avg_count:.1f})") + else: + self.state = self.STATE_NORMAL + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (冷却期内)") + + elif self.state == self.STATE_CONGESTED: + if avg_count < self.count_threshold: + self.state = self.STATE_CONFIRMING_CLEAR + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{self.count_threshold})") + + elif self.state == self.STATE_CONFIRMING_CLEAR: + if self.state_start_time is None: + self.state = self.STATE_NORMAL + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if avg_count >= self.count_threshold: + # 又拥堵了,回到CONGESTED + self.state = self.STATE_CONGESTED + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → CONGESTED (avg={avg_count:.1f}≥{self.count_threshold})") + elif elapsed >= self.confirm_clear_sec: + # 确认消散 + if self._last_alarm_id and self._congestion_start_time: + duration_ms = int((current_time - self._congestion_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "resolve_type": "congestion_cleared", + }) + logger.info(f"ROI {roi_id}: 拥堵告警已解决(车辆减少)") + + self.state = self.STATE_NORMAL + self.state_start_time = None + self._last_alarm_id = None + self._congestion_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → NORMAL") + + return alerts + + def set_last_alarm_id(self, alarm_id: str): + """由 main.py 在告警生成后回填 alarm_id""" + self._last_alarm_id = alarm_id + + def reset(self): + """重置算法状态""" + self.state = self.STATE_NORMAL + self.state_start_time = None + self._last_alarm_id = None + self._congestion_start_time = None + self._count_window.clear() + self.alert_cooldowns.clear() + + def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: + """获取当前状态""" + current_time = current_time or datetime.now() + state_info = { + "state": self.state, + "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, + "avg_vehicle_count": self._get_avg_count(), + } + if self.state in (self.STATE_CONGESTED, self.STATE_CONFIRMING_CLEAR) and self._congestion_start_time: + state_info["congestion_duration_sec"] = (current_time - self._congestion_start_time).total_seconds() + state_info["alarm_id"] = self._last_alarm_id + return state_info + + class AlgorithmManager: def __init__(self, working_hours: Optional[List[Dict]] = None): self.algorithms: Dict[str, Dict[str, Any]] = {} @@ -724,11 +1212,20 @@ class AlgorithmManager: "confirm_seconds": 5, "target_class": None, }, - # "crowd_detection": { - # "max_count": 10, - # "cooldown_seconds": 300, - # "target_class": "person", - # }, + "illegal_parking": { + "confirm_vehicle_sec": 15, + "parking_countdown_sec": 300, + "confirm_clear_sec": 30, + "cooldown_sec": 600, + "target_classes": ["car", "truck", "bus", "motorcycle"], + }, + "vehicle_congestion": { + "count_threshold": 3, + "confirm_congestion_sec": 60, + "confirm_clear_sec": 120, + "cooldown_sec": 600, + "target_classes": ["car", "truck", "bus", "motorcycle"], + }, } self._pubsub = None @@ -1090,13 +1587,22 @@ class AlgorithmManager: confirm_seconds=algo_params.get("confirm_seconds", 5), target_class=algo_params.get("target_class"), ) - # elif algorithm_type == "crowd_detection": - # from algorithms import CrowdDetectionAlgorithm - # self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm( - # max_count=algo_params.get("max_count", 10), - # cooldown_seconds=algo_params.get("cooldown_seconds", 300), - # target_class=algo_params.get("target_class", "person"), - # ) + elif algorithm_type == "illegal_parking": + self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm( + confirm_vehicle_sec=algo_params.get("confirm_vehicle_sec", 15), + parking_countdown_sec=algo_params.get("parking_countdown_sec", 300), + confirm_clear_sec=algo_params.get("confirm_clear_sec", 30), + cooldown_sec=algo_params.get("cooldown_sec", 600), + target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), + ) + elif algorithm_type == "vehicle_congestion": + self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm( + count_threshold=algo_params.get("count_threshold", 3), + confirm_congestion_sec=algo_params.get("confirm_congestion_sec", 60), + confirm_clear_sec=algo_params.get("confirm_clear_sec", 120), + cooldown_sec=algo_params.get("cooldown_sec", 600), + target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), + ) self._registered_keys.add(cache_key) @@ -1189,6 +1695,8 @@ class AlgorithmManager: "state": getattr(algo, "state", "WAITING"), "alarm_sent": getattr(algo, "alarm_sent", False), } + elif algo_type in ("illegal_parking", "vehicle_congestion"): + status[f"{algo_type}_{bind_id}"] = algo.get_state() else: status[f"{algo_type}_{bind_id}"] = { "detection_count": len(getattr(algo, "detection_start", {})), diff --git a/main.py b/main.py index f7db103..b06c11e 100644 --- a/main.py +++ b/main.py @@ -763,12 +763,16 @@ class EdgeInferenceService: self._camera_alert_cooldown[dedup_key] = now self._performance_stats["total_alerts_generated"] += 1 - # 获取算法的离岗开始时间 - leave_start_time = None - if alert_type == "leave_post": - algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post") - if algo and hasattr(algo, '_leave_start_time') and algo._leave_start_time: - leave_start_time = algo._leave_start_time.isoformat() + # 获取算法的事件开始时间(泛化:支持所有算法类型) + first_frame_time = None + algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type) + if algo: + # 各算法使用不同的内部变量名存储开始时间 + for attr in ('_leave_start_time', '_parking_start_time', '_congestion_start_time', '_intrusion_start_time'): + val = getattr(algo, attr, None) + if val: + first_frame_time = val.isoformat() + break from core.result_reporter import AlarmInfo, generate_alarm_id alarm_info = AlarmInfo( @@ -788,20 +792,16 @@ class EdgeInferenceService: "bind_id": bind.bind_id, "message": alert.get("message", ""), "edge_node_id": self._settings.mqtt.device_id, - "first_frame_time": leave_start_time, + "first_frame_time": first_frame_time, + "vehicle_count": alert.get("vehicle_count"), }, ) self._reporter.report_alarm(alarm_info, screenshot=frame.image) - # 回填 alarm_id 到算法实例(用于后续 resolve 追踪) - if alert_type == "leave_post": - algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post") - if algo and hasattr(algo, 'set_last_alarm_id'): - algo.set_last_alarm_id(alarm_info.alarm_id) - elif alert_type == "intrusion": - algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("intrusion") - if algo and hasattr(algo, 'set_last_alarm_id'): - algo.set_last_alarm_id(alarm_info.alarm_id) + # 回填 alarm_id 到算法实例(用于后续 resolve 追踪,泛化支持所有算法类型) + algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type) + if algo and hasattr(algo, 'set_last_alarm_id'): + algo.set_last_alarm_id(alarm_info.alarm_id) self._logger.info( f"告警已生成: type={alert_type}, "