From a9a545758353111b07148d4254a5373be26baf7d Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Tue, 7 Apr 2026 11:37:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9A=E7=AE=97=E6=B3=95?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E5=85=A8=E9=9D=A2=E9=87=8D=E6=9E=84=20?= =?UTF-8?q?=E2=80=94=20=E6=8F=90=E5=8F=96=E5=9F=BA=E7=B1=BB=E3=80=81?= =?UTF-8?q?=E5=B8=B8=E9=87=8F=E5=8C=96=E9=98=88=E5=80=BC=E3=80=81=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 提取 BaseAlgorithm 基类,四个算法共享 ROI 检查、目标类过滤、告警 ID 管理 - 硬编码比率阈值提取为类常量(RATIO_ON_DUTY_CONFIRM 等) - 滑动窗口添加 maxlen=1000 防内存溢出 - tracks 合并遍历 _scan_tracks() 减少重复遍历 - 比率/均值缓存,process() 入口计算一次 - 拥堵消散比例可配置(dissipation_ratio 参数) - 入侵 CONFIRMING_CLEAR 逻辑拆分为独立方法 - 补齐 AlgorithmType 枚举(illegal_parking、vehicle_congestion) - 修复 _leave_start_time None guard 防 TypeError - 修复 AlgorithmManager 默认参数与构造函数不一致 - 热更新补充支持 illegal_parking 和 vehicle_congestion Co-Authored-By: Claude Opus 4.6 (1M context) --- algorithms.py | 547 ++++++++++++++++++++----------- config/config_models.py | 2 + docs/code_review_report.md | 646 +++++++++++++++++++++++++++++++++++++ 3 files changed, 1006 insertions(+), 189 deletions(-) create mode 100644 docs/code_review_report.md diff --git a/algorithms.py b/algorithms.py index 770ebfe..6ef6cff 100644 --- a/algorithms.py +++ b/algorithms.py @@ -16,7 +16,42 @@ logger = logging.getLogger(__name__) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -class LeavePostAlgorithm: +class BaseAlgorithm: + """ + 算法基类,提取各算法共同逻辑。 + + 子类须定义自己的状态常量和 process() 方法。 + """ + + def __init__(self): + # 状态变量(子类构造函数中可覆盖 self.state 的初始值) + self.state: str = "" + self.state_start_time: Optional[datetime] = None + + # 告警追踪 + self._last_alarm_id: Optional[str] = None + + # ---- 公共工具方法 ---- + + @staticmethod + def _check_detection_in_roi(detection: Dict, roi_id: str) -> bool: + """检查检测结果是否在ROI内""" + matched_rois = detection.get("matched_rois", []) + return any(roi.get("roi_id") == roi_id for roi in matched_rois) + + @staticmethod + def _check_target_class(detection: Dict, target_class: Optional[str]) -> bool: + """检查是否为目标类别""" + if not target_class: + return True + return detection.get("class") == target_class + + def set_last_alarm_id(self, alarm_id: str): + """由 main.py 在告警生成后回填 alarm_id""" + self._last_alarm_id = alarm_id + + +class LeavePostAlgorithm(BaseAlgorithm): """ 离岗检测算法(优化版 v2.0) @@ -46,6 +81,12 @@ class LeavePostAlgorithm: # 告警级别常量(默认值,可通过 params 覆盖) DEFAULT_ALARM_LEVEL = 2 # 普通 + # Step 2: 阈值常量 + RATIO_ON_DUTY_CONFIRM = 0.6 # 上岗确认命中率阈值 + RATIO_OFF_DUTY_TRIGGER = 0.2 # 离岗触发阈值(低于此值进入离岗确认) + RATIO_RETURN_CONFIRM = 0.5 # 回岗确认命中率阈值 + RATIO_OFF_DUTY_CONFIRM = 0.2 # 离岗确认完成阈值 + def __init__( self, confirm_on_duty_sec: int = 10, # 上岗确认窗口(持续检测到人的时长) @@ -59,6 +100,8 @@ class LeavePostAlgorithm: # 兼容旧参数名(向后兼容) confirm_leave_sec: Optional[int] = None, ): + super().__init__() + # 时间参数(处理向后兼容) self.confirm_on_duty_sec = confirm_on_duty_sec self.confirm_off_duty_sec = confirm_leave_sec if confirm_leave_sec is not None else confirm_off_duty_sec @@ -75,12 +118,11 @@ class LeavePostAlgorithm: self.state: str = self.STATE_INIT self.state_start_time: Optional[datetime] = None - # 滑动窗口(用于平滑检测结果) - self.detection_window: deque = deque() # [(timestamp, has_person), ...] + # 滑动窗口(用于平滑检测结果)— Step 3: maxlen=1000 + self.detection_window: deque = deque(maxlen=1000) # [(timestamp, has_person), ...] self.window_size_sec = 10 # 滑动窗口大小:10秒 # 告警追踪 - self._last_alarm_id: Optional[str] = None self._leave_start_time: Optional[datetime] = None # 人员离开时间(用于计算持续时长) self._alarm_triggered_time: Optional[datetime] = None # 告警触发时间 self.alert_cooldowns: Dict[str, datetime] = {} @@ -123,17 +165,6 @@ class LeavePostAlgorithm: except: return 0 - def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: - """检查检测结果是否在ROI内""" - matched_rois = detection.get("matched_rois", []) - return any(roi.get("roi_id") == roi_id for roi in matched_rois) - - def _check_target_class(self, detection: Dict, target_class: str) -> bool: - """检查是否为目标类别""" - if not target_class: - return True - return detection.get("class") == target_class - def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: """获取ROI内最新的检测框""" for det in tracks: @@ -191,6 +222,8 @@ class LeavePostAlgorithm: # 更新滑动窗口 self._update_detection_window(current_time, roi_has_person) + + # Step 4: 计算一次比率,后续分支复用 detection_ratio = self._get_detection_ratio() # 检查工作时间 @@ -246,8 +279,8 @@ class LeavePostAlgorithm: self.state_start_time = current_time self.detection_window.clear() logger.debug(f"ROI {roi_id}: CONFIRMING_ON_DUTY → INIT (人消失)") - elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.6: - # 上岗确认成功(命中率>=70%) + elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= self.RATIO_ON_DUTY_CONFIRM: + # 上岗确认成功 self.state = self.STATE_ON_DUTY self.state_start_time = current_time self.alert_cooldowns.clear() # 确认在岗后清除冷却记录 @@ -255,7 +288,7 @@ class LeavePostAlgorithm: elif self.state == self.STATE_ON_DUTY: # 在岗状态:监控是否离岗 - if detection_ratio < 0.2: + if detection_ratio < self.RATIO_OFF_DUTY_TRIGGER: # 滑动窗口内 80% 以上帧无人,进入离岗确认 self.state = self.STATE_CONFIRMING_OFF_DUTY self.state_start_time = current_time @@ -265,12 +298,12 @@ class LeavePostAlgorithm: # 离岗确认中:需要持续未检测到人 elapsed = (current_time - self.state_start_time).total_seconds() - if detection_ratio >= 0.5: + if detection_ratio >= self.RATIO_RETURN_CONFIRM: # 窗口内检测率恢复到 50% 以上,人确实回来了 self.state = self.STATE_ON_DUTY self.state_start_time = current_time logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了, ratio={detection_ratio:.2f})") - elif elapsed >= self.confirm_off_duty_sec and detection_ratio < 0.2: + elif elapsed >= self.confirm_off_duty_sec and detection_ratio < self.RATIO_OFF_DUTY_CONFIRM: # 离岗确认成功,进入倒计时 self.state = self.STATE_OFF_DUTY_COUNTDOWN self.state_start_time = current_time @@ -295,6 +328,9 @@ class LeavePostAlgorithm: bbox = self._get_latest_bbox(tracks, roi_id) + # Bug fix: _leave_start_time None guard + first_frame_time = self._leave_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._leave_start_time else current_time.strftime('%Y-%m-%d %H:%M:%S') + alerts.append({ "track_id": roi_id, "camera_id": camera_id, @@ -302,7 +338,7 @@ class LeavePostAlgorithm: "alert_type": "leave_post", "alarm_level": self._alarm_level, "message": "人员离岗告警", - "first_frame_time": self._leave_start_time.strftime('%Y-%m-%d %H:%M:%S'), + "first_frame_time": first_frame_time, }) self.alert_cooldowns[cooldown_key] = current_time @@ -321,8 +357,13 @@ class LeavePostAlgorithm: # 特殊处理:从CONFIRMING_ON_DUTY再次确认上岗时,如果有未结束的告警,发送resolve if self.state == self.STATE_ON_DUTY and self._last_alarm_id: + # Bug fix: _leave_start_time None guard for resolve event + if self._leave_start_time is not None: + duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) + else: + duration_ms = 0 + # 回岗确认成功,发送resolve事件 - duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) alerts.append({ "alert_type": "alarm_resolve", "resolve_alarm_id": self._last_alarm_id, @@ -339,10 +380,6 @@ class LeavePostAlgorithm: return alerts - def set_last_alarm_id(self, alarm_id: str): - """由 main.py 在告警生成后回填 alarm_id""" - self._last_alarm_id = alarm_id - def reset(self): """重置算法状态""" self.state = self.STATE_INIT @@ -355,10 +392,13 @@ class LeavePostAlgorithm: def get_state(self, roi_id: str) -> Dict[str, Any]: """获取当前状态(用于调试和监控)""" + # Step 4: 缓存比率计算 + detection_ratio = self._get_detection_ratio() + state_info = { "state": self.state, "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, - "detection_ratio": self._get_detection_ratio(), + "detection_ratio": detection_ratio, "window_size": len(self.detection_window), } @@ -375,7 +415,7 @@ class LeavePostAlgorithm: return state_info -class IntrusionAlgorithm: +class IntrusionAlgorithm(BaseAlgorithm): """ 周界入侵检测算法(状态机版本 v3.0) @@ -410,6 +450,8 @@ class IntrusionAlgorithm: target_class: Optional[str] = None, alarm_level: Optional[int] = None, ): + super().__init__() + self.cooldown_seconds = cooldown_seconds # 参数兼容处理 @@ -427,7 +469,6 @@ class IntrusionAlgorithm: self.state_start_time: Optional[datetime] = None # 告警追踪 - self._last_alarm_id: Optional[str] = None self._intrusion_start_time: Optional[datetime] = None # CONFIRMING_CLEAR状态下检测到人的时间(用于判断是否持续5秒) @@ -441,24 +482,57 @@ class IntrusionAlgorithm: self.alert_triggered: Dict[str, bool] = {} self.detection_start: Dict[str, Optional[datetime]] = {} - def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: - matched_rois = detection.get("matched_rois", []) - for roi in matched_rois: - if roi.get("roi_id") == roi_id: - return True - return False - - def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: - if not target_class: - return True - return detection.get("class") == target_class - def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: + """获取ROI内最新的检测框 - IntrusionAlgorithm 不过滤 target_class""" for det in tracks: if self._check_detection_in_roi(det, roi_id): return det.get("bbox", []) return [] + # Step 7: CONFIRMING_CLEAR 逻辑拆分 + def _handle_clear_person_detected(self, roi_id: str, current_time: datetime): + """CONFIRMING_CLEAR 状态下检测到人的处理""" + if self._person_detected_in_clear_time is None: + # 第一次检测到人,记录时间 + self._person_detected_in_clear_time = current_time + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR 检测到人,开始确认(需持续{self.confirm_intrusion_seconds}秒)") + else: + # 持续有人,检查是否达到确认时间 + person_elapsed = (current_time - self._person_detected_in_clear_time).total_seconds() + if person_elapsed >= self.confirm_intrusion_seconds: + # 确认有人重新入侵,回到ALARMED + self.state = self.STATE_ALARMED + self.state_start_time = None + self._person_detected_in_clear_time = None + logger.info(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (确认有人重新入侵,持续{person_elapsed:.1f}秒)") + + def _handle_clear_no_person(self, roi_id: str, current_time: datetime, elapsed: float, alerts: List[Dict]): + """CONFIRMING_CLEAR 状态下无人的处理""" + self._person_detected_in_clear_time = None # 清除临时计时 + + # 检查是否达到消失确认时间 + if elapsed >= self.confirm_clear_seconds: + # 消失确认成功,发送resolve事件 + if self._last_alarm_id and self._intrusion_start_time: + duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "resolve_type": "intrusion_cleared", + }) + + logger.info(f"ROI {roi_id}: 告警已解决(入侵消失,持续无人{elapsed:.1f}秒)") + + # 重置状态 + self.state = self.STATE_IDLE + self.state_start_time = None + self._last_alarm_id = None + self._intrusion_start_time = None + self._person_detected_in_clear_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)") + def process( self, roi_id: str, @@ -556,54 +630,14 @@ class IntrusionAlgorithm: else: elapsed = (current_time - self.state_start_time).total_seconds() + # Step 7: 使用拆分后的方法 if roi_has_person: - # 检测到有人 - if self._person_detected_in_clear_time is None: - # 第一次检测到人,记录时间 - self._person_detected_in_clear_time = current_time - logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR 检测到人,开始确认(需持续{self.confirm_intrusion_seconds}秒)") - else: - # 持续有人,检查是否达到确认时间 - person_elapsed = (current_time - self._person_detected_in_clear_time).total_seconds() - if person_elapsed >= self.confirm_intrusion_seconds: - # 确认有人重新入侵,回到ALARMED - self.state = self.STATE_ALARMED - self.state_start_time = None - self._person_detected_in_clear_time = None - logger.info(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (确认有人重新入侵,持续{person_elapsed:.1f}秒)") + self._handle_clear_person_detected(roi_id, current_time) else: - # 没有人 - self._person_detected_in_clear_time = None # 清除临时计时 - - # 检查是否达到消失确认时间 - if elapsed >= self.confirm_clear_seconds: - # 消失确认成功,发送resolve事件 - if self._last_alarm_id and self._intrusion_start_time: - duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000) - alerts.append({ - "alert_type": "alarm_resolve", - "resolve_alarm_id": self._last_alarm_id, - "duration_ms": duration_ms, - "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), - "resolve_type": "intrusion_cleared", - }) - - logger.info(f"ROI {roi_id}: 告警已解决(入侵消失,持续无人{elapsed:.1f}秒)") - - # 重置状态 - self.state = self.STATE_IDLE - self.state_start_time = None - self._last_alarm_id = None - self._intrusion_start_time = None - self._person_detected_in_clear_time = None - logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)") + self._handle_clear_no_person(roi_id, current_time, elapsed, alerts) return alerts - def set_last_alarm_id(self, alarm_id: str): - """由 main.py 在告警生成后回填 alarm_id""" - self._last_alarm_id = alarm_id - def reset(self): """重置算法状态""" self.state = self.STATE_IDLE @@ -713,7 +747,7 @@ class IntrusionAlgorithm: # self.alert_triggered.clear() -class IllegalParkingAlgorithm: +class IllegalParkingAlgorithm(BaseAlgorithm): """ 车辆违停检测算法(状态机版本 v1.0) @@ -721,7 +755,7 @@ class IllegalParkingAlgorithm: IDLE → CONFIRMING_VEHICLE → PARKED_COUNTDOWN → ALARMED → CONFIRMING_CLEAR → IDLE 业务流程: - 1. 检测到车辆进入禁停区 → 车辆确认期(confirm_vehicle_sec,默认15秒,ratio≥0.6) + 1. 检测到车辆进入禁停区 → 车辆确认期(confirm_vehicle_sec,默认15秒,ratio>=0.6) 2. 确认有车 → 违停倒计时(parking_countdown_sec,默认300秒/5分钟) 3. 倒计时结束仍有车 → 触发告警(ALARMED状态) 4. 车辆离开 → 消失确认期(confirm_clear_sec,默认30秒,ratio<0.2) @@ -743,6 +777,14 @@ class IllegalParkingAlgorithm: # 滑动窗口参数 WINDOW_SIZE_SEC = 10 + # Step 2: 阈值常量 + RATIO_CONFIRMING_DROP = 0.3 # 确认期内命中率低于此值则回到IDLE + RATIO_CONFIRM_VEHICLE = 0.6 # 确认有车的命中率阈值 + RATIO_PARKED_LEAVE = 0.2 # 倒计时期间车辆离开的判定阈值 + RATIO_ALARMED_CLEAR = 0.15 # 已告警状态下进入消失确认的阈值 + RATIO_CLEAR_RETURN = 0.5 # 消失确认期间车辆再次出现的阈值 + RATIO_CLEAR_CONFIRM = 0.2 # 消失确认完成的阈值 + def __init__( self, confirm_vehicle_sec: int = 15, @@ -752,6 +794,8 @@ class IllegalParkingAlgorithm: target_classes: Optional[List[str]] = None, alarm_level: Optional[int] = None, ): + super().__init__() + self.confirm_vehicle_sec = confirm_vehicle_sec self.parking_countdown_sec = parking_countdown_sec self.confirm_clear_sec = confirm_clear_sec @@ -763,23 +807,15 @@ class IllegalParkingAlgorithm: self.state: str = self.STATE_IDLE self.state_start_time: Optional[datetime] = None - # 滑动窗口:存储 (timestamp, has_vehicle: bool) - self._detection_window: deque = deque() + # 滑动窗口:存储 (timestamp, has_vehicle: bool) — Step 3: maxlen=1000 + self._detection_window: deque = deque(maxlen=1000) # 告警追踪 - self._last_alarm_id: Optional[str] = None self._parking_start_time: Optional[datetime] = None # 冷却期管理 self.alert_cooldowns: Dict[str, datetime] = {} - def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: - matched_rois = detection.get("matched_rois", []) - for roi in matched_rois: - if roi.get("roi_id") == roi_id: - return True - return False - def _check_target_classes(self, detection: Dict) -> bool: """检查检测目标是否属于车辆类别""" det_class = detection.get("class", "") @@ -799,6 +835,28 @@ class IllegalParkingAlgorithm: hits = sum(1 for _, has in self._detection_window if has) return hits / len(self._detection_window) + # Step 5: 合并遍历方法 + def _scan_tracks(self, tracks: List[Dict], roi_id: str) -> Tuple[bool, int, List[float], float]: + """ + 一次遍历 tracks,返回 (has_target, count, latest_bbox, max_confidence)。 + 过滤 target_classes。 + """ + has_target = False + count = 0 + latest_bbox: List[float] = [] + max_confidence = 0.0 + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + has_target = True + count += 1 + if not latest_bbox: + latest_bbox = det.get("bbox", []) + conf = det.get("confidence", 0.0) + if conf > max_confidence: + max_confidence = conf + return has_target, count, latest_bbox, max_confidence + + # 保留旧方法以防外部调用 def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: for det in tracks: if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): @@ -824,14 +882,13 @@ class IllegalParkingAlgorithm: current_time = current_time or datetime.now() alerts = [] - # 检查ROI内是否有车辆 - roi_has_vehicle = any( - self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det) - for det in tracks - ) + # Step 5: 一次遍历获取所有信息 + roi_has_vehicle, vehicle_count, scan_bbox, scan_confidence = self._scan_tracks(tracks, roi_id) # 更新滑动窗口 self._update_window(current_time, roi_has_vehicle) + + # Step 4: 计算一次比率,后续分支复用 ratio = self._get_window_ratio() # === 状态机处理 === @@ -849,12 +906,12 @@ class IllegalParkingAlgorithm: elapsed = (current_time - self.state_start_time).total_seconds() - if ratio < 0.3: + if ratio < self.RATIO_CONFIRMING_DROP: # 命中率过低,车辆可能只是路过 self.state = self.STATE_IDLE self.state_start_time = None - logger.debug(f"ROI {roi_id}: CONFIRMING_VEHICLE → IDLE (ratio={ratio:.2f}<0.3)") - elif elapsed >= self.confirm_vehicle_sec and ratio >= 0.6: + logger.debug(f"ROI {roi_id}: CONFIRMING_VEHICLE → IDLE (ratio={ratio:.2f}<{self.RATIO_CONFIRMING_DROP})") + elif elapsed >= self.confirm_vehicle_sec and ratio >= self.RATIO_CONFIRM_VEHICLE: # 确认有车辆停留,进入倒计时 self._parking_start_time = self.state_start_time self.state = self.STATE_PARKED_COUNTDOWN @@ -868,7 +925,7 @@ class IllegalParkingAlgorithm: elapsed = (current_time - self.state_start_time).total_seconds() - if ratio < 0.2: + if ratio < self.RATIO_PARKED_LEAVE: # 车辆已离开 self.state = self.STATE_IDLE self.state_start_time = None @@ -880,16 +937,13 @@ class IllegalParkingAlgorithm: if cooldown_key not in self.alert_cooldowns or \ (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: - bbox = self._get_latest_bbox(tracks, roi_id) - confidence = self._get_max_confidence(tracks, roi_id) - alerts.append({ "roi_id": roi_id, "camera_id": camera_id, - "bbox": bbox, + "bbox": scan_bbox, "alert_type": "illegal_parking", "alarm_level": self._alarm_level, - "confidence": confidence, + "confidence": scan_confidence, "message": f"检测到车辆违停(已停留{int(elapsed / 60)}分钟)", "first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None, "duration_minutes": elapsed / 60, @@ -905,10 +959,10 @@ class IllegalParkingAlgorithm: logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (冷却期内)") elif self.state == self.STATE_ALARMED: - if ratio < 0.15: + if ratio < self.RATIO_ALARMED_CLEAR: self.state = self.STATE_CONFIRMING_CLEAR self.state_start_time = current_time - logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR (ratio={ratio:.2f}<0.15)") + logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR (ratio={ratio:.2f}<{self.RATIO_ALARMED_CLEAR})") elif self.state == self.STATE_CONFIRMING_CLEAR: if self.state_start_time is None: @@ -917,12 +971,12 @@ class IllegalParkingAlgorithm: elapsed = (current_time - self.state_start_time).total_seconds() - if ratio >= 0.5: + if ratio >= self.RATIO_CLEAR_RETURN: # 车辆又出现,回到ALARMED self.state = self.STATE_ALARMED self.state_start_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (车辆仍在)") - elif elapsed >= self.confirm_clear_sec and ratio < 0.2: + elif elapsed >= self.confirm_clear_sec and ratio < self.RATIO_CLEAR_CONFIRM: # 确认车辆已离开 if self._last_alarm_id and self._parking_start_time: duration_ms = int((current_time - self._parking_start_time).total_seconds() * 1000) @@ -944,10 +998,6 @@ class IllegalParkingAlgorithm: return alerts - def set_last_alarm_id(self, alarm_id: str): - """由 main.py 在告警生成后回填 alarm_id""" - self._last_alarm_id = alarm_id - def reset(self): """重置算法状态""" self.state = self.STATE_IDLE @@ -960,10 +1010,12 @@ class IllegalParkingAlgorithm: def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: """获取当前状态""" current_time = current_time or datetime.now() + # Step 4: 缓存窗口比率 + window_ratio = self._get_window_ratio() state_info = { "state": self.state, "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, - "window_ratio": self._get_window_ratio(), + "window_ratio": window_ratio, } if self.state in (self.STATE_ALARMED, self.STATE_PARKED_COUNTDOWN) and self._parking_start_time: state_info["parking_duration_sec"] = (current_time - self._parking_start_time).total_seconds() @@ -971,7 +1023,7 @@ class IllegalParkingAlgorithm: return state_info -class VehicleCongestionAlgorithm: +class VehicleCongestionAlgorithm(BaseAlgorithm): """ 车辆拥堵检测算法(状态机版本 v1.0) @@ -979,8 +1031,8 @@ class VehicleCongestionAlgorithm: NORMAL → CONFIRMING_CONGESTION → CONGESTED → CONFIRMING_CLEAR → NORMAL 业务流程: - 1. 检测到车辆数量 ≥ count_threshold → 拥堵确认期(confirm_congestion_sec,默认60秒) - 2. 确认拥堵(窗口内平均车辆数 ≥ threshold)→ 触发告警 + 1. 检测到车辆数量 >= count_threshold → 拥堵确认期(confirm_congestion_sec,默认60秒) + 2. 确认拥堵(窗口内平均车辆数 >= threshold)→ 触发告警 3. 车辆减少 → 消散确认期(confirm_clear_sec,默认120秒) 4. 确认消散(平均数 < threshold)→ 发送resolve事件 → 回到正常 @@ -999,6 +1051,9 @@ class VehicleCongestionAlgorithm: # 滑动窗口参数 WINDOW_SIZE_SEC = 10 + # Step 2: 阈值常量 — Step 6: 默认消散比例 + DISSIPATION_RATIO = 0.5 + def __init__( self, count_threshold: int = 5, @@ -1007,35 +1062,31 @@ class VehicleCongestionAlgorithm: cooldown_sec: int = 1800, target_classes: Optional[List[str]] = None, alarm_level: Optional[int] = None, + dissipation_ratio: float = 0.5, # Step 6: 消散比例可配置 ): + super().__init__() + self.count_threshold = count_threshold self.confirm_congestion_sec = confirm_congestion_sec self.confirm_clear_sec = confirm_clear_sec self.cooldown_sec = cooldown_sec self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL + self.dissipation_ratio = dissipation_ratio # Step 6 # 状态变量 self.state: str = self.STATE_NORMAL self.state_start_time: Optional[datetime] = None - # 滑动窗口:存储 (timestamp, vehicle_count: int) - self._count_window: deque = deque() + # 滑动窗口:存储 (timestamp, vehicle_count: int) — Step 3: maxlen=1000 + self._count_window: deque = deque(maxlen=1000) # 告警追踪 - self._last_alarm_id: Optional[str] = None self._congestion_start_time: Optional[datetime] = None # 冷却期管理 self.alert_cooldowns: Dict[str, datetime] = {} - def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: - matched_rois = detection.get("matched_rois", []) - for roi in matched_rois: - if roi.get("roi_id") == roi_id: - return True - return False - def _check_target_classes(self, detection: Dict) -> bool: det_class = detection.get("class", "") return det_class in self.target_classes @@ -1061,6 +1112,28 @@ class VehicleCongestionAlgorithm: total = sum(c for _, c in self._count_window) return total / len(self._count_window) + # Step 5: 合并遍历方法 + def _scan_tracks(self, tracks: List[Dict], roi_id: str) -> Tuple[bool, int, List[float], float]: + """ + 一次遍历 tracks,返回 (has_target, count, latest_bbox, max_confidence)。 + 过滤 target_classes。 + """ + has_target = False + count = 0 + latest_bbox: List[float] = [] + max_confidence = 0.0 + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + has_target = True + count += 1 + if not latest_bbox: + latest_bbox = det.get("bbox", []) + conf = det.get("confidence", 0.0) + if conf > max_confidence: + max_confidence = conf + return has_target, count, latest_bbox, max_confidence + + # 保留旧方法以防外部调用 def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float: max_conf = 0.0 for det in tracks: @@ -1085,18 +1158,24 @@ class VehicleCongestionAlgorithm: current_time = current_time or datetime.now() alerts = [] - # 统计ROI内车辆数 - vehicle_count = self._count_vehicles_in_roi(tracks, roi_id) + # Step 5: 一次遍历获取所有信息 + _has_target, vehicle_count, scan_bbox, scan_confidence = self._scan_tracks(tracks, roi_id) + self._update_count_window(current_time, vehicle_count) + + # Step 4: 计算一次均值,后续分支复用 avg_count = self._get_avg_count() + # Step 6: 消散阈值使用可配置比例 + dissipation_threshold = self.count_threshold * self.dissipation_ratio + # === 状态机处理 === if self.state == self.STATE_NORMAL: if avg_count >= self.count_threshold: self.state = self.STATE_CONFIRMING_CONGESTION self.state_start_time = current_time - logger.debug(f"ROI {roi_id}: NORMAL → CONFIRMING_CONGESTION (avg={avg_count:.1f}≥{self.count_threshold})") + logger.debug(f"ROI {roi_id}: NORMAL → CONFIRMING_CONGESTION (avg={avg_count:.1f}>={self.count_threshold})") elif self.state == self.STATE_CONFIRMING_CONGESTION: if self.state_start_time is None: @@ -1117,16 +1196,14 @@ class VehicleCongestionAlgorithm: (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: self._congestion_start_time = self.state_start_time - bbox = self._get_latest_bbox(tracks, roi_id) - confidence = self._get_max_confidence(tracks, roi_id) alerts.append({ "roi_id": roi_id, "camera_id": camera_id, - "bbox": bbox, + "bbox": scan_bbox, "alert_type": "vehicle_congestion", "alarm_level": self._alarm_level, - "confidence": confidence, + "confidence": scan_confidence, "message": f"检测到车辆拥堵(平均{avg_count:.0f}辆,持续{int(elapsed)}秒)", "first_frame_time": self._congestion_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._congestion_start_time else None, "vehicle_count": int(avg_count), @@ -1141,11 +1218,11 @@ class VehicleCongestionAlgorithm: logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (冷却期内)") elif self.state == self.STATE_CONGESTED: - # 车辆数降到阈值的一半以下才开始确认消散(避免抖动) - if avg_count < self.count_threshold * 0.5: + # Step 6: 使用可配置的消散比例 + if avg_count < dissipation_threshold: self.state = self.STATE_CONFIRMING_CLEAR self.state_start_time = current_time - logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{self.count_threshold * 0.5:.1f})") + logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{dissipation_threshold:.1f})") elif self.state == self.STATE_CONFIRMING_CLEAR: if self.state_start_time is None: @@ -1158,7 +1235,7 @@ class VehicleCongestionAlgorithm: # 又拥堵了,回到CONGESTED self.state = self.STATE_CONGESTED self.state_start_time = None - logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → CONGESTED (avg={avg_count:.1f}≥{self.count_threshold})") + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → CONGESTED (avg={avg_count:.1f}>={self.count_threshold})") elif elapsed >= self.confirm_clear_sec: # 确认消散 if self._last_alarm_id and self._congestion_start_time: @@ -1181,10 +1258,6 @@ class VehicleCongestionAlgorithm: return alerts - def set_last_alarm_id(self, alarm_id: str): - """由 main.py 在告警生成后回填 alarm_id""" - self._last_alarm_id = alarm_id - def reset(self): """重置算法状态""" self.state = self.STATE_NORMAL @@ -1197,10 +1270,12 @@ class VehicleCongestionAlgorithm: def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: """获取当前状态""" current_time = current_time or datetime.now() + # Step 4: 缓存均值计算 + avg_vehicle_count = self._get_avg_count() state_info = { "state": self.state, "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, - "avg_vehicle_count": self._get_avg_count(), + "avg_vehicle_count": avg_vehicle_count, } if self.state in (self.STATE_CONGESTED, self.STATE_CONFIRMING_CLEAR) and self._congestion_start_time: state_info["congestion_duration_sec"] = (current_time - self._congestion_start_time).total_seconds() @@ -1215,6 +1290,7 @@ class AlgorithmManager: self._update_lock = threading.Lock() self._registered_keys: set = set() # 已注册的 (roi_id, bind_id, algo_type) 缓存 + # Bug fix: 默认参数与算法构造函数一致 self.default_params = { "leave_post": { "confirm_on_duty_sec": 10, @@ -1225,28 +1301,29 @@ class AlgorithmManager: "intrusion": { "cooldown_seconds": 300, "confirm_seconds": 5, + "confirm_clear_seconds": 180, # Bug fix: 添加与构造函数一致的默认值 "target_class": None, }, "illegal_parking": { "confirm_vehicle_sec": 15, "parking_countdown_sec": 300, - "confirm_clear_sec": 30, - "cooldown_sec": 600, + "confirm_clear_sec": 120, # Bug fix: 与算法构造函数默认值一致(120,非30) + "cooldown_sec": 1800, # Bug fix: 与算法构造函数默认值一致(1800,非600) "target_classes": ["car", "truck", "bus", "motorcycle"], }, "vehicle_congestion": { - "count_threshold": 3, + "count_threshold": 5, # Bug fix: 与算法构造函数默认值一致(5,非3) "confirm_congestion_sec": 60, - "confirm_clear_sec": 120, - "cooldown_sec": 600, + "confirm_clear_sec": 180, # Bug fix: 与算法构造函数默认值一致(180,非120) + "cooldown_sec": 1800, # Bug fix: 与算法构造函数默认值一致(1800,非600) "target_classes": ["car", "truck", "bus", "motorcycle"], }, } - + self._pubsub = None self._pubsub_thread = None self._running = False - + def start_config_subscription(self): """启动配置变更订阅""" try: @@ -1262,10 +1339,10 @@ class AlgorithmManager: password=settings.redis.password, decode_responses=True, ) - + self._pubsub = redis_client.pubsub() self._pubsub.subscribe("config_update") - + self._running = True self._pubsub_thread = threading.Thread( target=self._config_update_worker, @@ -1276,7 +1353,7 @@ class AlgorithmManager: logger.info("已启动配置变更订阅") except Exception as e: logger.error(f"启动配置订阅失败: {e}") - + def _config_update_worker(self): """配置更新订阅工作线程""" try: @@ -1303,13 +1380,13 @@ class AlgorithmManager: else: self.reload_all_algorithms() else: - # type="full" / "camera" / unknown → 全量重载 + # type="full" / "camera" / unknown -> 全量重载 self.reload_all_algorithms() except Exception as e: logger.error(f"处理配置更新消息失败: {e}") except Exception as e: logger.error(f"配置订阅线程异常: {e}") - + def stop_config_subscription(self): """停止配置变更订阅""" self._running = False @@ -1318,17 +1395,17 @@ class AlgorithmManager: if self._pubsub_thread and self._pubsub_thread.is_alive(): self._pubsub_thread.join(timeout=5) logger.info("配置订阅已停止") - + def load_bind_from_redis(self, bind_id: str) -> bool: """从Redis加载单个绑定配置的算法""" try: from core.config_sync import get_config_sync_manager config_manager = get_config_sync_manager() bind_config = config_manager.get_algo_bind_from_redis(bind_id) - + if not bind_config: return False - + with self._update_lock: roi_id = bind_config.get("roi_id") algo_code = bind_config.get("algo_code", "leave_post") @@ -1343,12 +1420,12 @@ class AlgorithmManager: params = raw_params else: params = {} - + if roi_id not in self.algorithms: self.algorithms[roi_id] = {} - + key = f"{roi_id}_{bind_id}" - + if algo_code == "leave_post": configured_alarm_level = params.get("alarm_level") algo_params = { @@ -1396,36 +1473,100 @@ class AlgorithmManager: alarm_level=configured_alarm_level, ) logger.info(f"已从Redis加载算法: {key}") - + # Bug fix: 热更新支持 illegal_parking 和 vehicle_congestion + elif algo_code == "illegal_parking": + configured_alarm_level = params.get("alarm_level") + algo_params = { + "confirm_vehicle_sec": params.get("confirm_vehicle_sec", 15), + "parking_countdown_sec": params.get("parking_countdown_sec", 300), + "confirm_clear_sec": params.get("confirm_clear_sec", 120), + "cooldown_sec": params.get("cooldown_sec", 1800), + "target_classes": params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), + } + if key in self.algorithms.get(roi_id, {}) and "illegal_parking" in self.algorithms[roi_id].get(key, {}): + algo = self.algorithms[roi_id][key]["illegal_parking"] + algo.confirm_vehicle_sec = algo_params["confirm_vehicle_sec"] + algo.parking_countdown_sec = algo_params["parking_countdown_sec"] + algo.confirm_clear_sec = algo_params["confirm_clear_sec"] + algo.cooldown_sec = algo_params["cooldown_sec"] + algo.target_classes = algo_params["target_classes"] + if configured_alarm_level is not None: + algo._alarm_level = configured_alarm_level + logger.info(f"已热更新违停算法参数: {key}") + else: + self.algorithms[roi_id][key] = {} + self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm( + confirm_vehicle_sec=algo_params["confirm_vehicle_sec"], + parking_countdown_sec=algo_params["parking_countdown_sec"], + confirm_clear_sec=algo_params["confirm_clear_sec"], + cooldown_sec=algo_params["cooldown_sec"], + target_classes=algo_params["target_classes"], + alarm_level=configured_alarm_level, + ) + logger.info(f"已从Redis加载违停算法: {key}") + elif algo_code == "vehicle_congestion": + configured_alarm_level = params.get("alarm_level") + algo_params = { + "count_threshold": params.get("count_threshold", 5), + "confirm_congestion_sec": params.get("confirm_congestion_sec", 60), + "confirm_clear_sec": params.get("confirm_clear_sec", 180), + "cooldown_sec": params.get("cooldown_sec", 1800), + "target_classes": params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), + "dissipation_ratio": params.get("dissipation_ratio", 0.5), + } + if key in self.algorithms.get(roi_id, {}) and "vehicle_congestion" in self.algorithms[roi_id].get(key, {}): + algo = self.algorithms[roi_id][key]["vehicle_congestion"] + algo.count_threshold = algo_params["count_threshold"] + algo.confirm_congestion_sec = algo_params["confirm_congestion_sec"] + algo.confirm_clear_sec = algo_params["confirm_clear_sec"] + algo.cooldown_sec = algo_params["cooldown_sec"] + algo.target_classes = algo_params["target_classes"] + algo.dissipation_ratio = algo_params["dissipation_ratio"] + if configured_alarm_level is not None: + algo._alarm_level = configured_alarm_level + logger.info(f"已热更新拥堵算法参数: {key}") + else: + self.algorithms[roi_id][key] = {} + self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm( + count_threshold=algo_params["count_threshold"], + confirm_congestion_sec=algo_params["confirm_congestion_sec"], + confirm_clear_sec=algo_params["confirm_clear_sec"], + cooldown_sec=algo_params["cooldown_sec"], + target_classes=algo_params["target_classes"], + alarm_level=configured_alarm_level, + dissipation_ratio=algo_params["dissipation_ratio"], + ) + logger.info(f"已从Redis加载拥堵算法: {key}") + return True except Exception as e: logger.error(f"从Redis加载算法配置失败: {e}") return False - + def reload_bind_algorithm(self, bind_id: str) -> bool: """重新加载单个绑定的算法配置""" return self.load_bind_from_redis(bind_id) - + def reload_algorithm(self, roi_id: str) -> bool: """重新加载单个ROI的所有算法绑定配置""" try: from core.config_sync import get_config_sync_manager config_manager = get_config_sync_manager() bindings = config_manager.get_bindings_from_redis(roi_id) - + if not bindings: return False - + for bind in bindings: bind_id = bind.get("bind_id") self.reset_algorithm(roi_id, bind_id) self.load_bind_from_redis(bind_id) - + return True except Exception as e: logger.error(f"重新加载ROI算法配置失败: {e}") return False - + def update_algorithm_params(self, roi_id: str, bind_id: str, bind_config: dict) -> bool: """仅更新算法参数,保留状态机 @@ -1491,7 +1632,34 @@ class AlgorithmManager: logger.info(f"[{roi_id}_{bind_id}] 更新周界入侵参数: intrusion={confirm_intrusion_sec}s, clear={confirm_clear_sec}s") - # 其他算法类型可以在此添加 + # Bug fix: 热更新支持 illegal_parking 和 vehicle_congestion + elif algo_code == "illegal_parking": + existing_algo.confirm_vehicle_sec = params.get("confirm_vehicle_sec", 15) + existing_algo.parking_countdown_sec = params.get("parking_countdown_sec", 300) + existing_algo.confirm_clear_sec = params.get("confirm_clear_sec", 120) + existing_algo.cooldown_sec = params.get("cooldown_sec", 1800) + if "target_classes" in params: + existing_algo.target_classes = params["target_classes"] + alarm_level = params.get("alarm_level") + if alarm_level is not None: + existing_algo._alarm_level = alarm_level + + logger.info(f"[{roi_id}_{bind_id}] 更新违停检测参数") + + elif algo_code == "vehicle_congestion": + existing_algo.count_threshold = params.get("count_threshold", 5) + existing_algo.confirm_congestion_sec = params.get("confirm_congestion_sec", 60) + existing_algo.confirm_clear_sec = params.get("confirm_clear_sec", 180) + existing_algo.cooldown_sec = params.get("cooldown_sec", 1800) + if "target_classes" in params: + existing_algo.target_classes = params["target_classes"] + if "dissipation_ratio" in params: + existing_algo.dissipation_ratio = params["dissipation_ratio"] + alarm_level = params.get("alarm_level") + if alarm_level is not None: + existing_algo._alarm_level = alarm_level + + logger.info(f"[{roi_id}_{bind_id}] 更新拥堵检测参数") return True @@ -1565,7 +1733,7 @@ class AlgorithmManager: except Exception as e: logger.error(f"重新加载所有算法配置失败: {e}") return count - + def register_algorithm( self, roi_id: str, @@ -1581,13 +1749,13 @@ class AlgorithmManager: return key = f"{roi_id}_{bind_id}" - + if roi_id not in self.algorithms: self.algorithms[roi_id] = {} - + if key not in self.algorithms[roi_id]: self.algorithms[roi_id][key] = {} - + algo_params = self.default_params.get(algorithm_type, {}).copy() if params: algo_params.update(params) @@ -1617,23 +1785,24 @@ class AlgorithmManager: self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm( confirm_vehicle_sec=algo_params.get("confirm_vehicle_sec", 15), parking_countdown_sec=algo_params.get("parking_countdown_sec", 300), - confirm_clear_sec=algo_params.get("confirm_clear_sec", 30), - cooldown_sec=algo_params.get("cooldown_sec", 600), + confirm_clear_sec=algo_params.get("confirm_clear_sec", 120), + cooldown_sec=algo_params.get("cooldown_sec", 1800), target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), alarm_level=configured_alarm_level, ) elif algorithm_type == "vehicle_congestion": self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm( - count_threshold=algo_params.get("count_threshold", 3), + count_threshold=algo_params.get("count_threshold", 5), confirm_congestion_sec=algo_params.get("confirm_congestion_sec", 60), - confirm_clear_sec=algo_params.get("confirm_clear_sec", 120), - cooldown_sec=algo_params.get("cooldown_sec", 600), + confirm_clear_sec=algo_params.get("confirm_clear_sec", 180), + cooldown_sec=algo_params.get("cooldown_sec", 1800), target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), alarm_level=configured_alarm_level, + dissipation_ratio=algo_params.get("dissipation_ratio", 0.5), ) self._registered_keys.add(cache_key) - + def process( self, roi_id: str, @@ -1664,7 +1833,7 @@ class AlgorithmManager: for param_key, value in params.items(): if hasattr(algo, param_key): setattr(algo, param_key, value) - + def reset_algorithm(self, roi_id: str, bind_id: Optional[str] = None): """重置算法状态(支持绑定ID)""" if roi_id not in self.algorithms: @@ -1688,21 +1857,21 @@ class AlgorithmManager: self._registered_keys = { k for k in self._registered_keys if k[0] != roi_id } - + def reset_all(self): """重置所有算法""" for roi_algorithms in self.algorithms.values(): for bind_algorithms in roi_algorithms.values(): for algo in bind_algorithms.values(): algo.reset() - + def remove_roi(self, roi_id: str): """移除ROI的所有算法""" if roi_id in self.algorithms: for key in list(self.algorithms[roi_id].keys()): self.reset_algorithm(roi_id, key.split("_")[-1] if "_" in key else None) del self.algorithms[roi_id] - + def remove_bind(self, roi_id: str, bind_id: str): """移除绑定的算法""" key = f"{roi_id}_{bind_id}" @@ -1710,7 +1879,7 @@ class AlgorithmManager: for algo in self.algorithms[roi_id][key].values(): algo.reset() del self.algorithms[roi_id][key] - + def get_status(self, roi_id: str) -> Dict[str, Any]: """获取算法状态""" status = {} diff --git a/config/config_models.py b/config/config_models.py index 1579098..7ec803d 100644 --- a/config/config_models.py +++ b/config/config_models.py @@ -20,6 +20,8 @@ class AlgorithmType(str, Enum): """算法类型枚举""" LEAVE_POST = "leave_post" INTRUSION = "intrusion" + ILLEGAL_PARKING = "illegal_parking" + VEHICLE_CONGESTION = "vehicle_congestion" CROWD_DETECTION = "crowd_detection" FACE_RECOGNITION = "face_recognition" diff --git a/docs/code_review_report.md b/docs/code_review_report.md new file mode 100644 index 0000000..0776445 --- /dev/null +++ b/docs/code_review_report.md @@ -0,0 +1,646 @@ +# algorithms.py 代码审查报告 + +> 审查日期: 2026-04-02 +> 审查文件: algorithms.py (1733行) +> 审查范围: LeavePostAlgorithm, IntrusionAlgorithm, IllegalParkingAlgorithm, VehicleCongestionAlgorithm, AlgorithmManager + +--- + +## 1. 功能基线清单 + +### 1.1 LeavePostAlgorithm (离岗检测) + +**状态定义 (7个):** + +| 状态 | 常量 | 含义 | +|------|------|------| +| INIT | `STATE_INIT` | 初始化,等待检测到人 | +| CONFIRMING_ON_DUTY | `STATE_CONFIRMING_ON_DUTY` | 上岗确认中(需持续检测到人) | +| ON_DUTY | `STATE_ON_DUTY` | 已确认在岗 | +| CONFIRMING_OFF_DUTY | `STATE_CONFIRMING_OFF_DUTY` | 离岗确认中(持续未检测到人) | +| OFF_DUTY_COUNTDOWN | `STATE_OFF_DUTY_COUNTDOWN` | 离岗倒计时(确认离岗后等待告警) | +| ALARMED | `STATE_ALARMED` | 已告警(等待回岗) | +| NON_WORK_TIME | `STATE_NON_WORK_TIME` | 非工作时间 | + +**状态转换矩阵:** + +| 从 \ 到 | INIT | CONFIRMING_ON_DUTY | ON_DUTY | CONFIRMING_OFF_DUTY | OFF_DUTY_COUNTDOWN | ALARMED | NON_WORK_TIME | +|---------|------|--------------------|---------|--------------------|-------------------|---------|--------------| +| INIT | - | roi_has_person==True | - | - | - | - | not in_working_hours | +| CONFIRMING_ON_DUTY | detection_ratio==0 | - | elapsed>=confirm_on_duty_sec AND ratio>=0.6 | - | - | - | not in_working_hours | +| ON_DUTY | - | - | - | detection_ratio<0.2 | - | - | not in_working_hours | +| CONFIRMING_OFF_DUTY | - | - | detection_ratio>=0.5 | - | elapsed>=confirm_off_duty_sec AND ratio<0.2 | - | not in_working_hours | +| OFF_DUTY_COUNTDOWN | - | - | roi_has_person==True | - | - | elapsed>=leave_countdown_sec AND cooldown ok | not in_working_hours | +| ALARMED | - | roi_has_person==True | - | - | - | - | not in_working_hours | +| NON_WORK_TIME | in_working_hours | - | - | - | - | - | - | + +**关键行为:** +- 使用滑动窗口(10秒)平滑检测结果,计算 detection_ratio +- ALARMED -> CONFIRMING_ON_DUTY 复用上岗确认状态(不是独立的回岗确认状态) +- 进入 ON_DUTY 状态时,若存在 `_last_alarm_id`,自动发送 `alarm_resolve` 事件 +- 进入 NON_WORK_TIME 时,若有未结束告警,发送 resolve_type="non_work_time" 的 resolve 事件 +- 冷却期检查使用 `cooldown_key = f"{camera_id}_{roi_id}"` +- `_last_alarm_id` 由外部 main.py 通过 `set_last_alarm_id()` 回填 +- `_leave_start_time` 在进入 OFF_DUTY_COUNTDOWN 时记录(值等于 state_start_time) + +**构造函数参数:** +- `confirm_on_duty_sec`: int = 10 (上岗确认窗口) +- `confirm_off_duty_sec`: int = 30 (离岗确认窗口) +- `confirm_return_sec`: int = 10 (回岗确认窗口 -- 注意: 代码中实际未使用此参数) +- `leave_countdown_sec`: int = 300 (离岗倒计时) +- `cooldown_sec`: int = 600 (告警冷却期) +- `working_hours`: Optional[List[Dict]] = None +- `target_class`: Optional[str] = "person" +- `alarm_level`: Optional[int] = None (默认2) +- `confirm_leave_sec`: Optional[int] = None (向后兼容旧参数名) + +### 1.2 IntrusionAlgorithm (周界入侵) + +**状态定义 (4个):** + +| 状态 | 常量 | 含义 | +|------|------|------| +| IDLE | `STATE_IDLE` | 空闲,无入侵 | +| CONFIRMING_INTRUSION | `STATE_CONFIRMING_INTRUSION` | 入侵确认中 | +| ALARMED | `STATE_ALARMED` | 已告警(等待入侵消失) | +| CONFIRMING_CLEAR | `STATE_CONFIRMING_CLEAR` | 入侵消失确认中 | + +**状态转换矩阵:** + +| 从 \ 到 | IDLE | CONFIRMING_INTRUSION | ALARMED | CONFIRMING_CLEAR | +|---------|------|---------------------|---------|-----------------| +| IDLE | - | roi_has_person==True | - | - | +| CONFIRMING_INTRUSION | not roi_has_person OR cooldown内 OR state_start_time==None | - | elapsed>=confirm_intrusion_seconds AND cooldown ok | - | +| ALARMED | - | - | - | not roi_has_person | +| CONFIRMING_CLEAR | elapsed>=confirm_clear_seconds AND no person | - | person_elapsed>=confirm_intrusion_seconds (持续有人) OR state_start_time==None | - | + +**关键行为:** +- 不使用滑动窗口,直接使用当前帧的 `roi_has_person` 判断 +- CONFIRMING_CLEAR 有子状态追踪: `_person_detected_in_clear_time` 用于判断短暂有人 vs 持续有人 +- 冷却期内入侵确认直接回到 IDLE(不触发告警) +- 包含防御性编程: state_start_time==None 时重置到 IDLE +- `_check_target_class` 允许 target_class 为 None(匹配所有类别) + +**构造函数参数:** +- `cooldown_seconds`: int = 300 +- `confirm_seconds`: int = 5 (向后兼容) +- `confirm_intrusion_seconds`: Optional[int] = None (默认使用 confirm_seconds) +- `confirm_clear_seconds`: Optional[int] = None (默认180) +- `target_class`: Optional[str] = None +- `alarm_level`: Optional[int] = None (默认1) + +### 1.3 IllegalParkingAlgorithm (车辆违停) + +**状态定义 (5个):** + +| 状态 | 常量 | 含义 | +|------|------|------| +| IDLE | `STATE_IDLE` | 空闲 | +| CONFIRMING_VEHICLE | `STATE_CONFIRMING_VEHICLE` | 车辆确认中 | +| PARKED_COUNTDOWN | `STATE_PARKED_COUNTDOWN` | 违停倒计时 | +| ALARMED | `STATE_ALARMED` | 已告警 | +| CONFIRMING_CLEAR | `STATE_CONFIRMING_CLEAR` | 消失确认中 | + +**状态转换矩阵:** + +| 从 \ 到 | IDLE | CONFIRMING_VEHICLE | PARKED_COUNTDOWN | ALARMED | CONFIRMING_CLEAR | +|---------|------|-------------------|-----------------|---------|-----------------| +| IDLE | - | roi_has_vehicle | - | - | - | +| CONFIRMING_VEHICLE | ratio<0.3 OR state_start_time==None | - | elapsed>=confirm_vehicle_sec AND ratio>=0.6 | - | - | +| PARKED_COUNTDOWN | ratio<0.2 (车离开) OR cooldown内 OR state_start_time==None | - | - | elapsed>=parking_countdown_sec AND cooldown ok | - | +| ALARMED | - | - | - | - | ratio<0.15 | +| CONFIRMING_CLEAR | elapsed>=confirm_clear_sec AND ratio<0.2 OR state_start_time==None | - | - | ratio>=0.5 | - | + +**关键行为:** +- 使用滑动窗口(WINDOW_SIZE_SEC=10秒) +- 支持多类车辆: target_classes 默认 ["car", "truck", "bus", "motorcycle"] +- 告警字段包含 `confidence` 和 `duration_minutes` +- CONFIRMING_CLEAR -> IDLE 时清除 alert_cooldowns(新车违停可正常告警) +- ALARMED 进入 CONFIRMING_CLEAR 的阈值(0.15)比其他算法更严格 + +**构造函数参数:** +- `confirm_vehicle_sec`: int = 15 +- `parking_countdown_sec`: int = 300 +- `confirm_clear_sec`: int = 120 +- `cooldown_sec`: int = 1800 +- `target_classes`: Optional[List[str]] = None (默认 ["car", "truck", "bus", "motorcycle"]) +- `alarm_level`: Optional[int] = None (默认1) + +### 1.4 VehicleCongestionAlgorithm (车辆拥堵) + +**状态定义 (4个):** + +| 状态 | 常量 | 含义 | +|------|------|------| +| NORMAL | `STATE_NORMAL` | 正常 | +| CONFIRMING_CONGESTION | `STATE_CONFIRMING_CONGESTION` | 拥堵确认中 | +| CONGESTED | `STATE_CONGESTED` | 拥堵中 | +| CONFIRMING_CLEAR | `STATE_CONFIRMING_CLEAR` | 消散确认中 | + +**状态转换矩阵:** + +| 从 \ 到 | NORMAL | CONFIRMING_CONGESTION | CONGESTED | CONFIRMING_CLEAR | +|---------|--------|----------------------|-----------|-----------------| +| NORMAL | - | avg_count >= count_threshold | - | - | +| CONFIRMING_CONGESTION | avg_count < count_threshold OR cooldown内 OR state_start_time==None | - | elapsed >= confirm_congestion_sec AND cooldown ok | - | +| CONGESTED | - | - | - | avg_count < count_threshold * 0.5 | +| CONFIRMING_CLEAR | elapsed >= confirm_clear_sec OR state_start_time==None | - | avg_count >= count_threshold | - | + +**关键行为:** +- 使用滑动窗口(WINDOW_SIZE_SEC=10秒)存储车辆计数,取平均值判断 +- 消散需车辆数降到阈值的 **50%** 以下才开始确认(避免抖动) +- CONFIRMING_CLEAR -> NORMAL 时清除 alert_cooldowns +- 告警字段包含 `vehicle_count` 和 `confidence` + +**构造函数参数:** +- `count_threshold`: int = 5 +- `confirm_congestion_sec`: int = 60 +- `confirm_clear_sec`: int = 180 +- `cooldown_sec`: int = 1800 +- `target_classes`: Optional[List[str]] = None (默认 ["car", "truck", "bus", "motorcycle"]) +- `alarm_level`: Optional[int] = None (默认2) + +### 1.5 AlgorithmManager + +**数据结构:** +``` +self.algorithms: Dict[str, Dict[str, Dict[str, Algorithm]]] + 结构: { roi_id: { "{roi_id}_{bind_id}": { algo_type: algo_instance } } } +``` + +**公开方法:** +- `start_config_subscription()` - 启动 Redis 配置订阅 +- `stop_config_subscription()` - 停止配置订阅 +- `load_bind_from_redis(bind_id)` - 从 Redis 加载单个绑定配置 +- `reload_bind_algorithm(bind_id)` - 重载单个绑定 +- `reload_algorithm(roi_id)` - 重载单个 ROI 的所有算法 +- `update_algorithm_params(roi_id, bind_id, bind_config)` - 仅更新参数,保留状态 +- `reload_all_algorithms(preserve_state=True)` - 重载全部算法 +- `register_algorithm(roi_id, bind_id, algorithm_type, params)` - 注册算法(带缓存) +- `process(roi_id, bind_id, camera_id, algorithm_type, tracks, current_time)` - 处理检测结果 +- `update_roi_params(roi_id, bind_id, algorithm_type, params)` - 更新参数 +- `reset_algorithm(roi_id, bind_id=None)` - 重置算法状态 +- `reset_all()` - 重置所有算法 +- `remove_roi(roi_id)` - 移除 ROI +- `remove_bind(roi_id, bind_id)` - 移除绑定 +- `get_status(roi_id)` - 获取状态 + +--- + +## 2. 接口契约清单 + +### 2.1 process() 方法统一签名 + +所有四个算法的 `process()` 方法具有相同签名: + +```python +def process( + self, + roi_id: str, + camera_id: str, + tracks: List[Dict], + current_time: Optional[datetime] = None, +) -> List[Dict] +``` + +**tracks 输入格式 (每个元素):** +```python +{ + "track_id": str, # 跟踪ID + "class": str, # 检测类别 ("person", "car", "truck", ...) + "confidence": float, # 置信度 + "bbox": List[float], # 边界框 [x1, y1, x2, y2] + "matched_rois": [ # 匹配的ROI列表 + {"roi_id": str} + ], +} +``` + +### 2.2 告警输出格式 + +#### LeavePostAlgorithm 告警: +```python +{ + "track_id": str, # 等于 roi_id + "camera_id": str, + "bbox": List[float], # 可能为空 [] + "alert_type": "leave_post", + "alarm_level": int, # 默认 2 + "message": "人员离岗告警", + "first_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S' +} +``` +注意: leave_post 告警使用 `track_id` 而非 `roi_id` 字段名(与其他算法不同)。 + +#### IntrusionAlgorithm 告警: +```python +{ + "roi_id": str, + "camera_id": str, + "bbox": List[float], + "alert_type": "intrusion", + "alarm_level": int, # 默认 1 + "message": "检测到周界入侵", + "first_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S' +} +``` + +#### IllegalParkingAlgorithm 告警: +```python +{ + "roi_id": str, + "camera_id": str, + "bbox": List[float], + "alert_type": "illegal_parking", + "alarm_level": int, # 默认 1 + "confidence": float, + "message": str, # 动态生成,包含停留分钟数 + "first_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S',可能为 None + "duration_minutes": float, +} +``` + +#### VehicleCongestionAlgorithm 告警: +```python +{ + "roi_id": str, + "camera_id": str, + "bbox": List[float], + "alert_type": "vehicle_congestion", + "alarm_level": int, # 默认 2 + "confidence": float, + "message": str, # 动态生成,包含平均车辆数和持续秒数 + "first_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S',可能为 None + "vehicle_count": int, +} +``` + +#### alarm_resolve 事件 (所有算法统一格式): +```python +{ + "alert_type": "alarm_resolve", + "resolve_alarm_id": str, + "duration_ms": int, + "last_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S' + "resolve_type": str, # "person_returned" | "non_work_time" | "intrusion_cleared" | "vehicle_left" | "congestion_cleared" +} +``` + +### 2.3 AlgorithmManager.process() 签名 + +```python +def process( + self, + roi_id: str, + bind_id: str, + camera_id: str, + algorithm_type: str, + tracks: List[Dict], + current_time: Optional[datetime] = None, +) -> List[Dict] +``` + +### 2.4 AlgorithmManager.register_algorithm() 签名 + +```python +def register_algorithm( + self, + roi_id: str, + bind_id: str, + algorithm_type: str, # "leave_post" | "intrusion" | "illegal_parking" | "vehicle_congestion" + params: Optional[Dict[str, Any]] = None, +) +``` + +--- + +## 3. 已发现的潜在问题 + +### 3.1 Critical (必须修复) + +**[C1] LeavePostAlgorithm: `confirm_return_sec` 参数声明但从未使用** +- 位置: 第53行声明,但状态机中 ALARMED -> CONFIRMING_ON_DUTY -> ON_DUTY 的转换直接复用 `confirm_on_duty_sec` +- 影响: 使用者设置 `confirm_return_sec` 以为可以独立控制回岗确认时长,但实际无效 +- 建议: 文档中声明此参数复用 `confirm_on_duty_sec`,或实现独立的回岗确认逻辑 + +**[C2] LeavePostAlgorithm: resolve 事件的 duration_ms 计算依赖 `_leave_start_time`,但该值可能为 None** +- 位置: 第325行 `duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)` +- 场景: 如果算法在 OFF_DUTY_COUNTDOWN 之前(即 _leave_start_time 赋值之前)因某种异常跳到 ON_DUTY 且 _last_alarm_id 非空,会抛出 TypeError +- 风险: 低概率但会导致该帧整个 process 调用抛异常 + +**[C3] LeavePostAlgorithm: `_leave_start_time` 赋值时机问题** +- 位置: 第277行 `self._leave_start_time = self.state_start_time` +- `state_start_time` 此时是 CONFIRMING_OFF_DUTY 的开始时间(非 OFF_DUTY_COUNTDOWN 的开始时间,因为 state_start_time 在下一行第276行才被更新为 current_time) +- 实际效果: `_leave_start_time` 记录的是**离岗确认开始时间**,不是**倒计时开始时间** +- 审查结论: 这是有意设计,离开时间应该从人离开被确认开始计算,但代码注释"记录离开时间"可能造成误解 + +### 3.2 Important (应该修复) + +**[I1] LeavePostAlgorithm 告警字典使用 `track_id` 而非 `roi_id`** +- 位置: 第299行 `"track_id": roi_id` +- 其他三个算法统一使用 `"roi_id": roi_id` +- 影响: main.py 中的 `_handle_detections` 不直接使用此字段(它有自己的 roi_id),所以不影响功能,但接口不一致 + +**[I2] AlgorithmManager 缺乏线程安全** +- `process()` 方法未加锁(第1637-1651行),而 `register_algorithm()` 也未加锁 +- `_update_lock` 仅在 `load_bind_from_redis` 和 `reload_all_algorithms` 中使用 +- 风险: 如果 config_update_worker 线程调用 `reload_all_algorithms` 同时主线程调用 `process`,可能读到不一致的 `self.algorithms` 字典 +- 缓解: Python GIL 在字典读操作上提供了一定程度的原子性保护,实际崩溃概率很低 + +**[I3] AlgorithmManager.default_params 中 illegal_parking 的 confirm_clear_sec 默认值(30) 与 IllegalParkingAlgorithm 构造函数默认值(120) 不一致** +- 位置: 第1233行 vs 第751行 +- 影响: 通过 AlgorithmManager 创建的 illegal_parking 算法 confirm_clear_sec 为 30,直接创建为 120 + +**[I4] AlgorithmManager.default_params 中 vehicle_congestion 的 count_threshold 默认值(3) 与 VehicleCongestionAlgorithm 构造函数默认值(5) 不一致** +- 位置: 第1237行 vs 第1004行 +- 影响: 通过 AlgorithmManager 创建的算法阈值为 3,直接创建为 5 + +**[I5] `update_algorithm_params` 仅支持 leave_post 和 intrusion** +- 位置: 第1461-1496行 +- 缺少 illegal_parking 和 vehicle_congestion 的参数更新逻辑(第1494行注释 "其他算法类型可以在此添加") +- 影响: `reload_all_algorithms(preserve_state=True)` 对 illegal_parking/vehicle_congestion 会回退到 `load_bind_from_redis`,会重置算法状态 + +**[I6] `load_bind_from_redis` 仅支持 leave_post 和 intrusion** +- 位置: 第1322-1403行 +- 缺少 illegal_parking 和 vehicle_congestion 的 Redis 加载逻辑 +- 影响: 从 Redis 热更新配置时,这两种算法无法被加载 + +**[I7] IntrusionAlgorithm: `_get_latest_bbox` 不检查 target_class** +- 位置: 第456-460行 +- 与 LeavePostAlgorithm 不同(第139-142行会检查 target_class) +- 影响: 可能返回非目标类别的 bbox + +**[I8] `_is_in_working_hours` 不支持跨午夜时间段** +- 位置: 第112行 `if start_minutes <= current_minutes < end_minutes` +- 如果 working_hours 配置为 `{"start": "22:00", "end": "06:00"}`,则无法正确判断 +- 影响: 夜班场景可能不工作 + +### 3.3 Suggestions (建议改进) + +**[S1] 滑动窗口的 window_size_sec 硬编码为 10 秒** +- LeavePostAlgorithm 第80行: `self.window_size_sec = 10` +- IllegalParkingAlgorithm 第744行: `WINDOW_SIZE_SEC = 10` +- VehicleCongestionAlgorithm 第1000行: `WINDOW_SIZE_SEC = 10` +- 建议: 提取为可配置参数 + +**[S2] LeavePostAlgorithm._update_detection_window 与 IllegalParkingAlgorithm._update_window 实现逻辑相同但代码重复** +- 可提取为基类方法或工具函数 + +**[S3] LeavePostAlgorithm.get_state() 使用 datetime.now() 而非参数传入的 current_time** +- 位置: 第367-372行 +- 在测试场景中会导致状态信息不准确(不影响核心逻辑,仅影响监控展示) + +**[S4] AlgorithmManager.get_status() 中 leave_post 分支访问 `alarm_sent` 属性** +- 位置: 第1724行 +- LeavePostAlgorithm 实际上没有 `alarm_sent` 属性,getattr 返回 False +- 这是旧版本残留代码 + +**[S5] AlgorithmManager.remove_roi() 中 bind_id 解析逻辑脆弱** +- 位置: 第1703行 `key.split("_")[-1]` +- 如果 bind_id 本身包含下划线,解析会出错 +- key 格式为 `"{roi_id}_{bind_id}"`,应该用 `key[len(roi_id)+1:]` 提取 bind_id + +**[S6] `_is_in_working_hours` 中的 bare `except:` (第99行, 第124行)** +- 应该至少 except Exception 或更具体的异常类型 + +--- + +## 4. 测试覆盖分析 + +### 4.1 test_leave_post_full_workflow.py 覆盖的场景 + +| 场景 | 状态路径 | 覆盖 | +|------|----------|------| +| 上岗确认成功 | INIT -> CONFIRMING_ON_DUTY -> ON_DUTY | YES | +| 离岗确认 | ON_DUTY -> CONFIRMING_OFF_DUTY -> OFF_DUTY_COUNTDOWN | YES | +| 倒计时触发告警 | OFF_DUTY_COUNTDOWN -> ALARMED | YES | +| 回岗 resolve | ALARMED -> CONFIRMING_ON_DUTY -> ON_DUTY (+ resolve) | YES | +| 告警字段验证 | 无 duration_minutes, 有 first_frame_time | YES | +| resolve 字段验证 | duration_ms, resolve_alarm_id, resolve_type | YES | +| set_last_alarm_id 回填 | - | YES | + +### 4.2 test_vehicle_algorithms.py 覆盖的场景 + +**IllegalParkingAlgorithm:** +| 场景 | 覆盖 | +|------|------| +| 完整生命周期 IDLE->CONFIRMING->COUNTDOWN->ALARMED->CLEAR->IDLE | YES | +| 车辆短暂路过不触发 | YES | +| 多类车辆检测 (truck, bus) | YES | +| person 不触发违停 | YES | +| 冷却期内不重复告警 | YES | +| resolve 事件发送 | YES | + +**VehicleCongestionAlgorithm:** +| 场景 | 覆盖 | +|------|------| +| 完整生命周期 NORMAL->CONFIRMING->CONGESTED->CLEAR->NORMAL | YES | +| 少于阈值不触发 | YES | +| 短暂拥堵不触发 | YES | +| resolve 事件发送 | YES | + +**AlgorithmManager:** +| 场景 | 覆盖 | +|------|------| +| 注册 illegal_parking | YES | +| 注册 vehicle_congestion | YES | +| process 调用 | YES | +| get_status 调用 | YES | +| 重复注册走缓存 | YES | +| reset_algorithm | YES | + +### 4.3 测试覆盖缺口 + +**LeavePostAlgorithm 未覆盖:** +- [ ] CONFIRMING_ON_DUTY -> INIT (人消失) +- [ ] CONFIRMING_OFF_DUTY -> ON_DUTY (人回来,ratio>=0.5) +- [ ] OFF_DUTY_COUNTDOWN -> ON_DUTY (倒计时期间回来) +- [ ] 非工作时间自动 resolve +- [ ] NON_WORK_TIME -> INIT (工作时间恢复) +- [ ] 冷却期内不重复告警 +- [ ] 空 tracks 输入 +- [ ] working_hours 配置解析(字符串格式) + +**IntrusionAlgorithm 完全未测试:** +- [ ] 完整生命周期 IDLE->CONFIRMING->ALARMED->CLEAR->IDLE +- [ ] 入侵确认中人消失 +- [ ] CONFIRMING_CLEAR 中短暂有人 vs 持续有人 +- [ ] 冷却期 +- [ ] resolve 事件 +- [ ] target_class=None 匹配所有类别 +- [ ] state_start_time==None 的防御性代码分支 + +**IllegalParkingAlgorithm 未覆盖:** +- [ ] state_start_time==None 的防御性代码分支 (CONFIRMING_VEHICLE, PARKED_COUNTDOWN, CONFIRMING_CLEAR) +- [ ] CONFIRMING_CLEAR -> ALARMED (车辆又出现, ratio>=0.5) +- [ ] ALARMED 状态下 ratio 在 0.15-0.5 之间(维持 ALARMED) + +**VehicleCongestionAlgorithm 未覆盖:** +- [ ] state_start_time==None 的防御性代码分支 +- [ ] CONFIRMING_CLEAR -> CONGESTED (又拥堵了) +- [ ] 消散阈值 0.5*count_threshold 的边界值 +- [ ] 冷却期测试 + +**AlgorithmManager 未覆盖:** +- [ ] start_config_subscription / stop_config_subscription +- [ ] load_bind_from_redis +- [ ] reload_all_algorithms (含孤立实例清理) +- [ ] update_algorithm_params +- [ ] remove_roi / remove_bind +- [ ] 并发调用安全性 +- [ ] register_algorithm 的 leave_post 和 intrusion 类型 + +--- + +## 5. 优化安全边界 + +### 5.1 不可修改区域 (功能合约) + +以下代码是外部依赖的契约,修改会破坏 main.py 或其他模块: + +1. **所有算法的 `process()` 方法签名** -- main.py 的 `_handle_detections` 直接调用 +2. **告警字典的字段名和类型** -- main.py 的 `_handle_detections` 依赖 `alert_type`, `alarm_level`, `confidence`, `bbox`, `message`, `first_frame_time`, `duration_minutes`, `vehicle_count` +3. **`alarm_resolve` 事件格式** -- main.py 的 resolve 逻辑依赖 `resolve_alarm_id`, `duration_ms`, `last_frame_time`, `resolve_type` +4. **`set_last_alarm_id(alarm_id)` 方法** -- main.py 回填 alarm_id +5. **`reset()` 方法** -- AlgorithmManager 调用 +6. **`get_state()` 方法** -- AlgorithmManager.get_status() 调用 +7. **AlgorithmManager.process() 签名和返回值** -- main.py 直接调用 +8. **AlgorithmManager.register_algorithm() 签名** -- main.py 直接调用 +9. **AlgorithmManager.algorithms 的三层字典结构** -- main.py 直接访问内部实例来获取 `_leave_start_time` 等属性 (第905-911行) + +### 5.2 可安全优化区域 + +以下代码修改不会影响外部行为: + +1. **滑动窗口实现** -- `_update_detection_window`, `_update_window`, `_update_count_window` 的内部实现可以优化,只要 `_get_detection_ratio()`, `_get_window_ratio()`, `_get_avg_count()` 的语义不变 +2. **`_check_detection_in_roi` / `_check_target_class` / `_check_target_classes`** -- 内部实现可优化,接口不变即可 +3. **`_get_latest_bbox` / `_get_max_confidence`** -- 辅助方法,内部实现可优化 +4. **`_is_in_working_hours` / `_parse_time_to_minutes`** -- 内部实现可优化(建议修复跨午夜问题) +5. **AlgorithmManager 的 Redis 相关方法** -- `load_bind_from_redis`, `reload_*`, `_config_update_worker` 可以修改,不影响算法核心逻辑 +6. **日志输出** -- 所有 logger.* 调用可以调整 +7. **`default_params` 字典** -- 可以修正默认值不一致的问题 + +### 5.3 高风险修改区域 (需要完整回归测试) + +1. **状态转换条件 (ratio 阈值)** -- 任何 detection_ratio, window_ratio 的阈值变更都可能影响告警灵敏度 + - LeavePostAlgorithm: 0.6 (上岗), 0.2 (离岗开始), 0.5 (离岗恢复), 0.2 (离岗确认) + - IllegalParkingAlgorithm: 0.3 (放弃确认), 0.6 (确认有车), 0.2 (车离开), 0.15 (开始消失确认), 0.5 (车又来), 0.2 (消失确认) + - VehicleCongestionAlgorithm: count_threshold (开始确认), 0.5*count_threshold (开始消散) + +2. **时间比较逻辑** -- `elapsed >= xxx_sec` 的方向(大于等于 vs 大于) +3. **冷却期检查** -- `cooldown_key` 的构造方式和比较逻辑 +4. **resolve 事件触发逻辑** -- LeavePostAlgorithm 的 "进入 ON_DUTY 且 _last_alarm_id 存在" 的检查 + +### 5.4 重复代码可提取区域 + +以下方法在多个算法中重复实现,可以提取为基类或 mixin: + +| 方法 | 出现在 | 可提取 | +|------|--------|--------| +| `_check_detection_in_roi` | 全部4个 | YES | +| `_check_target_class` | LeavePost, Intrusion | YES | +| `_check_target_classes` | IllegalParking, VehicleCongestion | YES | +| `_get_latest_bbox` | 全部4个 | YES (注意 Intrusion 不检查 target_class) | +| `_get_max_confidence` | IllegalParking, VehicleCongestion | YES | +| `set_last_alarm_id` | 全部4个 | YES | +| 滑动窗口逻辑 | LeavePost, IllegalParking, VehicleCongestion | YES | + +--- + +## 6. config_models.py 与 algorithms.py 的一致性 + +### 6.1 AlgorithmType 枚举缺失 + +`config_models.py` 中的 `AlgorithmType` 枚举: +```python +LEAVE_POST = "leave_post" +INTRUSION = "intrusion" +CROWD_DETECTION = "crowd_detection" # 已在 algorithms.py 中注释掉 +FACE_RECOGNITION = "face_recognition" # algorithms.py 中不存在 +``` + +缺失: +- `ILLEGAL_PARKING = "illegal_parking"` -- algorithms.py 已实现但枚举未添加 +- `VEHICLE_CONGESTION = "vehicle_congestion"` -- algorithms.py 已实现但枚举未添加 + +### 6.2 ROIInfo 默认值 + +`config_models.py` 的 `ROIInfo.confirm_leave_sec` 默认值为 **10**,而 `AlgorithmManager.default_params["leave_post"]["confirm_leave_sec"]` 为 **30**,`LeavePostAlgorithm.confirm_off_duty_sec` 默认为 **30**。 + +--- + +## 7. main.py 集成要点 + +### 7.1 main.py 对 algorithms.py 的依赖 + +1. **直接访问算法内部属性** (第905-911行): + ```python + for attr in ('_leave_start_time', '_parking_start_time', '_congestion_start_time', '_intrusion_start_time'): + val = getattr(algo, attr, None) + ``` + 这是紧耦合,如果内部属性名变更会导致 first_frame_time 丢失。 + +2. **alarm_id 回填** (第943-945行): + ```python + algo.set_last_alarm_id(alarm_info.alarm_id) + ``` + +3. **两层去重机制**: + - ROI级别: `_active_alarms[f"{roi_id}_{alert_type}"]` + - 摄像头级别: `_camera_alert_cooldown[f"{camera_id}_{alert_type}"]` (30秒冷却) + +4. **duration_ms 在 ext_data 中的计算** (第925行): + ```python + "duration_ms": int(alert.get("duration_minutes", 0) * 60 * 1000) if alert.get("duration_minutes") else None, + ``` + 仅 IllegalParkingAlgorithm 的告警包含 `duration_minutes`,其他算法的 ext_data.duration_ms 为 None。 + +--- + +## 附录: 状态机可视化 + +### LeavePostAlgorithm +``` + +--> NON_WORK_TIME --+ + | (any state) | (in_working_hours) + | v + INIT --+--> CONFIRMING_ON_DUTY ---> ON_DUTY ---> CONFIRMING_OFF_DUTY + ^ | ^ | + +----------+ | v + (ratio==0) | OFF_DUTY_COUNTDOWN + | | | + | (roi_has_person) | v + | +---------------+ ALARMED + | | | + +-------+ (roi_has_person) | + ON_DUTY <-- (if _last_alarm_id, send resolve) +``` + +### IntrusionAlgorithm +``` + IDLE ---> CONFIRMING_INTRUSION ---> ALARMED ---> CONFIRMING_CLEAR ---> IDLE + ^ | | | + +------------+ +--------+ + (person gone) (person back >= confirm_intrusion_sec) + -> ALARMED +``` + +### IllegalParkingAlgorithm +``` + IDLE --> CONFIRMING_VEHICLE --> PARKED_COUNTDOWN --> ALARMED --> CONFIRMING_CLEAR --> IDLE + ^ | | | + +----------+ | v + (ratio<0.3) (ratio<0.2) | ALARMED + ^ | | (ratio>=0.5) + +--------------------------+ v + ALARMED +``` + +### VehicleCongestionAlgorithm +``` + NORMAL --> CONFIRMING_CONGESTION --> CONGESTED --> CONFIRMING_CLEAR --> NORMAL + ^ | | + +--------------+ v + (avg=threshold) +```