import logging import os import sys import threading import time from collections import deque from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np import redis logger = logging.getLogger(__name__) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) class LeavePostAlgorithm: """ 离岗检测算法(优化版 v2.0) 状态机: INIT → CONFIRMING_ON_DUTY → ON_DUTY → CONFIRMING_OFF_DUTY → OFF_DUTY_COUNTDOWN → ALARMED → (回岗) → CONFIRMING_ON_DUTY 业务流程: 1. 启动后检测到人 → 上岗确认期(confirm_on_duty_sec,默认10秒) 2. 确认上岗后 → 在岗状态(ON_DUTY) 3. 人离开ROI → 离岗确认期(confirm_off_duty_sec,默认30秒) 4. 确认离岗后 → 离岗倒计时(leave_countdown_sec,默认300秒) 5. 倒计时结束 → 触发告警(ALARMED状态) 6. 人员回岗 → 回岗确认期(confirm_return_sec,默认10秒) 7. 确认回岗 → 发送resolve事件 → 重新上岗确认 """ # 状态定义 STATE_INIT = "INIT" # 初始化 STATE_CONFIRMING_ON_DUTY = "CONFIRMING_ON_DUTY" # 上岗确认中 STATE_ON_DUTY = "ON_DUTY" # 在岗 STATE_CONFIRMING_OFF_DUTY = "CONFIRMING_OFF_DUTY" # 离岗确认中 STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN" # 离岗倒计时中 STATE_ALARMED = "ALARMED" # 已告警(等待回岗) STATE_NON_WORK_TIME = "NON_WORK_TIME" # 非工作时间 # 告警级别常量(默认值,可通过 params 覆盖) DEFAULT_ALARM_LEVEL = 2 # 普通 def __init__( self, confirm_on_duty_sec: int = 10, # 上岗确认窗口(持续检测到人的时长) confirm_off_duty_sec: int = 30, # 离岗确认窗口(持续未检测到人的时长) confirm_return_sec: int = 10, # 回岗确认窗口(告警后回来需确认时长) leave_countdown_sec: int = 300, # 离岗倒计时(确认离岗后等待告警的时长) cooldown_sec: int = 600, # 告警冷却期(两次告警的最小间隔) working_hours: Optional[List[Dict]] = None, target_class: Optional[str] = "person", alarm_level: Optional[int] = None, # 兼容旧参数名(向后兼容) confirm_leave_sec: Optional[int] = None, ): # 时间参数(处理向后兼容) self.confirm_on_duty_sec = confirm_on_duty_sec self.confirm_off_duty_sec = confirm_leave_sec if confirm_leave_sec is not None else confirm_off_duty_sec self.confirm_return_sec = confirm_return_sec self.leave_countdown_sec = leave_countdown_sec self.cooldown_sec = cooldown_sec # 工作时间和目标类别 self.working_hours = working_hours or [] self.target_class = target_class self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL # 状态变量 self.state: str = self.STATE_INIT self.state_start_time: Optional[datetime] = None # 滑动窗口(用于平滑检测结果) self.detection_window: deque = deque() # [(timestamp, has_person), ...] self.window_size_sec = 10 # 滑动窗口大小:10秒 # 告警追踪 self._last_alarm_id: Optional[str] = None self._leave_start_time: Optional[datetime] = None # 人员离开时间(用于计算持续时长) self._alarm_triggered_time: Optional[datetime] = None # 告警触发时间 self.alert_cooldowns: Dict[str, datetime] = {} def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: """检查是否在工作时间""" if not self.working_hours: return True import json working_hours = self.working_hours if isinstance(working_hours, str): try: working_hours = json.loads(working_hours) except: return True if not working_hours: return True dt = dt or datetime.now() current_minutes = dt.hour * 60 + dt.minute for period in working_hours: start_str = period["start"] if isinstance(period, dict) else period end_str = period["end"] if isinstance(period, dict) else period start_minutes = self._parse_time_to_minutes(start_str) end_minutes = self._parse_time_to_minutes(end_str) if start_minutes <= current_minutes < end_minutes: return True return False def _parse_time_to_minutes(self, time_str: str) -> int: """将时间字符串转换为分钟数""" if isinstance(time_str, int): return time_str try: parts = time_str.split(":") return int(parts[0]) * 60 + int(parts[1]) except: return 0 def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: """检查检测结果是否在ROI内""" matched_rois = detection.get("matched_rois", []) return any(roi.get("roi_id") == roi_id for roi in matched_rois) def _check_target_class(self, detection: Dict, target_class: str) -> bool: """检查是否为目标类别""" if not target_class: return True return detection.get("class") == target_class def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: """获取ROI内最新的检测框""" for det in tracks: if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): return det.get("bbox", []) return [] def _update_detection_window(self, current_time: datetime, has_person: bool): """更新滑动窗口""" self.detection_window.append((current_time, has_person)) # 移除窗口外的旧数据 while self.detection_window: oldest_time, _ = self.detection_window[0] if (current_time - oldest_time).total_seconds() > self.window_size_sec: self.detection_window.popleft() else: break def _get_detection_ratio(self) -> float: """计算滑动窗口内的检测命中率""" if not self.detection_window: return 0.0 person_count = sum(1 for _, has_person in self.detection_window if has_person) return person_count / len(self.detection_window) def process( self, roi_id: str, camera_id: str, tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: """ 处理单帧检测结果 Args: roi_id: ROI区域ID camera_id: 摄像头ID tracks: 检测结果列表 current_time: 当前时间(用于测试,生产环境传None) Returns: 告警列表 [{"alert_type": "leave_post", ...}, {"alert_type": "alarm_resolve", ...}] """ current_time = current_time or datetime.now() alerts = [] # 检查ROI内是否有目标 roi_has_person = any( self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class) for det in tracks ) # 更新滑动窗口 self._update_detection_window(current_time, roi_has_person) detection_ratio = self._get_detection_ratio() # 检查工作时间 in_working_hours = self._is_in_working_hours(current_time) # === 非工作时间处理 === if not in_working_hours: if self.state != self.STATE_NON_WORK_TIME: # 进入非工作时间,清理状态 if self._last_alarm_id and self._leave_start_time: # 如果有未结束的告警,发送resolve事件(非工作时间自动关闭) duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) alerts.append({ "alert_type": "alarm_resolve", "resolve_alarm_id": self._last_alarm_id, "duration_ms": duration_ms, "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), "resolve_type": "non_work_time", }) self.state = self.STATE_NON_WORK_TIME self.state_start_time = None self.detection_window.clear() self._last_alarm_id = None self._leave_start_time = None self._alarm_triggered_time = None return alerts # === 工作时间处理 === # 从非工作时间恢复 if self.state == self.STATE_NON_WORK_TIME: self.state = self.STATE_INIT self.state_start_time = current_time self.detection_window.clear() # === 状态机处理 === if self.state == self.STATE_INIT: # 初始化状态:等待检测到人 if roi_has_person: self.state = self.STATE_CONFIRMING_ON_DUTY self.state_start_time = current_time logger.debug(f"ROI {roi_id}: INIT → CONFIRMING_ON_DUTY") elif self.state == self.STATE_CONFIRMING_ON_DUTY: # 上岗确认中:需要持续检测到人 elapsed = (current_time - self.state_start_time).total_seconds() if detection_ratio == 0: # 人消失了,回到INIT self.state = self.STATE_INIT self.state_start_time = current_time self.detection_window.clear() logger.debug(f"ROI {roi_id}: CONFIRMING_ON_DUTY → INIT (人消失)") elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.6: # 上岗确认成功(命中率>=70%) self.state = self.STATE_ON_DUTY self.state_start_time = current_time self.alert_cooldowns.clear() # 确认在岗后清除冷却记录 logger.info(f"ROI {roi_id}: CONFIRMING_ON_DUTY → ON_DUTY (上岗确认成功)") elif self.state == self.STATE_ON_DUTY: # 在岗状态:监控是否离岗 if detection_ratio < 0.2: # 滑动窗口内 80% 以上帧无人,进入离岗确认 self.state = self.STATE_CONFIRMING_OFF_DUTY self.state_start_time = current_time logger.debug(f"ROI {roi_id}: ON_DUTY → CONFIRMING_OFF_DUTY") elif self.state == self.STATE_CONFIRMING_OFF_DUTY: # 离岗确认中:需要持续未检测到人 elapsed = (current_time - self.state_start_time).total_seconds() if detection_ratio >= 0.5: # 窗口内检测率恢复到 50% 以上,人确实回来了 self.state = self.STATE_ON_DUTY self.state_start_time = current_time logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了, ratio={detection_ratio:.2f})") elif elapsed >= self.confirm_off_duty_sec and detection_ratio < 0.2: # 离岗确认成功,进入倒计时 self.state = self.STATE_OFF_DUTY_COUNTDOWN self.state_start_time = current_time self._leave_start_time = self.state_start_time # 记录离开时间 logger.info(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → OFF_DUTY_COUNTDOWN (离岗确认成功)") elif self.state == self.STATE_OFF_DUTY_COUNTDOWN: # 离岗倒计时中:等待告警触发 elapsed = (current_time - self.state_start_time).total_seconds() if roi_has_person: # 倒计时期间人回来了,回到ON_DUTY(未触发告警) self.state = self.STATE_ON_DUTY self.state_start_time = current_time self._leave_start_time = None logger.info(f"ROI {roi_id}: OFF_DUTY_COUNTDOWN → ON_DUTY (倒计时期间回来)") elif elapsed >= self.leave_countdown_sec: # 倒计时结束,触发告警 cooldown_key = f"{camera_id}_{roi_id}" if cooldown_key not in self.alert_cooldowns or \ (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: bbox = self._get_latest_bbox(tracks, roi_id) alerts.append({ "track_id": roi_id, "camera_id": camera_id, "bbox": bbox, "alert_type": "leave_post", "alarm_level": self._alarm_level, "message": "人员离岗告警", "first_frame_time": self._leave_start_time.strftime('%Y-%m-%d %H:%M:%S'), }) self.alert_cooldowns[cooldown_key] = current_time self._alarm_triggered_time = current_time self.state = self.STATE_ALARMED # _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填 logger.warning(f"ROI {roi_id}: OFF_DUTY_COUNTDOWN → ALARMED (告警触发)") elif self.state == self.STATE_ALARMED: # 已告警状态:等待人员回岗 if roi_has_person: # 检测到人,进入回岗确认 self.state = self.STATE_CONFIRMING_ON_DUTY # 复用上岗确认状态 self.state_start_time = current_time logger.info(f"ROI {roi_id}: ALARMED → CONFIRMING_ON_DUTY (检测到人回岗)") # 特殊处理:从CONFIRMING_ON_DUTY再次确认上岗时,如果有未结束的告警,发送resolve if self.state == self.STATE_ON_DUTY and self._last_alarm_id: # 回岗确认成功,发送resolve事件 duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) alerts.append({ "alert_type": "alarm_resolve", "resolve_alarm_id": self._last_alarm_id, "duration_ms": duration_ms, "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), "resolve_type": "person_returned", }) # 清理告警追踪信息 self._last_alarm_id = None self._leave_start_time = None self._alarm_triggered_time = None logger.info(f"ROI {roi_id}: 告警已解决(人员回岗)") return alerts def set_last_alarm_id(self, alarm_id: str): """由 main.py 在告警生成后回填 alarm_id""" self._last_alarm_id = alarm_id def reset(self): """重置算法状态""" self.state = self.STATE_INIT self.state_start_time = None self.detection_window.clear() self._last_alarm_id = None self._leave_start_time = None self._alarm_triggered_time = None self.alert_cooldowns.clear() def get_state(self, roi_id: str) -> Dict[str, Any]: """获取当前状态(用于调试和监控)""" state_info = { "state": self.state, "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, "detection_ratio": self._get_detection_ratio(), "window_size": len(self.detection_window), } # 添加状态特定信息 if self.state == self.STATE_OFF_DUTY_COUNTDOWN and self.state_start_time: elapsed = (datetime.now() - self.state_start_time).total_seconds() state_info["countdown_remaining_sec"] = max(0, self.leave_countdown_sec - elapsed) if self.state == self.STATE_ALARMED and self._leave_start_time: total_off_duty_sec = (datetime.now() - self._leave_start_time).total_seconds() state_info["total_off_duty_sec"] = total_off_duty_sec state_info["alarm_id"] = self._last_alarm_id return state_info class IntrusionAlgorithm: """ 周界入侵检测算法(状态机版本 v3.0) 状态机: IDLE → CONFIRMING_INTRUSION → ALARMED → CONFIRMING_CLEAR → IDLE 业务流程: 1. 检测到人 → 入侵确认期(confirm_intrusion_seconds,默认5秒) 2. 确认入侵 → 触发告警(ALARMED状态) 3. 人离开ROI → 入侵消失确认期(confirm_clear_seconds,默认180秒) 4. 消失确认期间: - 短暂有人(<5秒):继续倒计时 - 持续有人(≥5秒):回到ALARMED状态 5. 确认消失(持续无人180秒)→ 发送resolve事件 → 回到空闲状态 """ # 状态定义 STATE_IDLE = "IDLE" # 空闲(无入侵) STATE_CONFIRMING_INTRUSION = "CONFIRMING_INTRUSION" # 入侵确认中 STATE_ALARMED = "ALARMED" # 已告警(等待入侵消失) STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 入侵消失确认中 # 告警级别常量(默认值,可通过 params 覆盖) DEFAULT_ALARM_LEVEL = 1 # 重要 def __init__( self, cooldown_seconds: int = 300, confirm_seconds: int = 5, # 向后兼容:同时设置入侵和消失确认时间 confirm_intrusion_seconds: Optional[int] = None, # 入侵确认时间(默认5秒) confirm_clear_seconds: Optional[int] = None, # 消失确认时间(默认180秒) target_class: Optional[str] = None, alarm_level: Optional[int] = None, ): self.cooldown_seconds = cooldown_seconds # 参数兼容处理 self.confirm_intrusion_seconds = confirm_intrusion_seconds if confirm_intrusion_seconds is not None else confirm_seconds self.confirm_clear_seconds = confirm_clear_seconds if confirm_clear_seconds is not None else 180 # 向后兼容 self.confirm_seconds = confirm_seconds self.target_class = target_class self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL # 状态变量 self.state: str = self.STATE_IDLE self.state_start_time: Optional[datetime] = None # 告警追踪 self._last_alarm_id: Optional[str] = None self._intrusion_start_time: Optional[datetime] = None # CONFIRMING_CLEAR状态下检测到人的时间(用于判断是否持续5秒) self._person_detected_in_clear_time: Optional[datetime] = None # 冷却期管理 self.alert_cooldowns: Dict[str, datetime] = {} # 向后兼容:保留旧变量(不再使用) self.last_alert_time: Dict[str, datetime] = {} self.alert_triggered: Dict[str, bool] = {} self.detection_start: Dict[str, Optional[datetime]] = {} def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: matched_rois = detection.get("matched_rois", []) for roi in matched_rois: if roi.get("roi_id") == roi_id: return True return False def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: if not target_class: return True return detection.get("class") == target_class def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: for det in tracks: if self._check_detection_in_roi(det, roi_id): return det.get("bbox", []) return [] def process( self, roi_id: str, camera_id: str, tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: """ 处理单帧检测结果 Args: roi_id: ROI区域ID camera_id: 摄像头ID tracks: 检测结果列表 current_time: 当前时间(用于测试,生产环境传None) Returns: 告警列表 [{"alert_type": "intrusion", ...}, {"alert_type": "alarm_resolve", ...}] """ current_time = current_time or datetime.now() alerts = [] # 检查ROI内是否有目标 roi_has_person = any( self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class) for det in tracks ) # === 状态机处理 === if self.state == self.STATE_IDLE: # 空闲状态:等待检测到入侵 if roi_has_person: self.state = self.STATE_CONFIRMING_INTRUSION self.state_start_time = current_time logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_INTRUSION") elif self.state == self.STATE_CONFIRMING_INTRUSION: # 入侵确认中:需要持续检测到人 if self.state_start_time is None: # 防御性编程:如果状态时间为空,重置到IDLE self.state = self.STATE_IDLE logger.error(f"ROI {roi_id}: CONFIRMING_INTRUSION 状态异常,state_start_time为None,重置到IDLE") else: elapsed = (current_time - self.state_start_time).total_seconds() if not roi_has_person: # 人消失了,回到IDLE self.state = self.STATE_IDLE self.state_start_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (人消失)") elif elapsed >= self.confirm_intrusion_seconds: # 入侵确认成功,检查冷却期 cooldown_key = f"{camera_id}_{roi_id}" if cooldown_key not in self.alert_cooldowns or \ (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_seconds: bbox = self._get_latest_bbox(tracks, roi_id) self._intrusion_start_time = self.state_start_time # 记录入侵开始时间 alerts.append({ "roi_id": roi_id, "camera_id": camera_id, "bbox": bbox, "alert_type": "intrusion", "alarm_level": self._alarm_level, "message": "检测到周界入侵", "first_frame_time": self._intrusion_start_time.strftime('%Y-%m-%d %H:%M:%S'), }) self.alert_cooldowns[cooldown_key] = current_time self.state = self.STATE_ALARMED # _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填 logger.warning(f"ROI {roi_id}: CONFIRMING_INTRUSION → ALARMED (告警触发)") else: # 冷却期内,回到IDLE self.state = self.STATE_IDLE self.state_start_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (冷却期内)") elif self.state == self.STATE_ALARMED: # 已告警状态:等待入侵消失 if not roi_has_person: # 检测到无人,进入消失确认 self.state = self.STATE_CONFIRMING_CLEAR self.state_start_time = current_time logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR") elif self.state == self.STATE_CONFIRMING_CLEAR: # 入侵消失确认中:需要持续未检测到人 if self.state_start_time is None: # 防御性编程:如果状态时间为空,重置到IDLE self.state = self.STATE_IDLE logger.error(f"ROI {roi_id}: CONFIRMING_CLEAR 状态异常,state_start_time为None,重置到IDLE") else: elapsed = (current_time - self.state_start_time).total_seconds() if roi_has_person: # 检测到有人 if self._person_detected_in_clear_time is None: # 第一次检测到人,记录时间 self._person_detected_in_clear_time = current_time logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR 检测到人,开始确认(需持续{self.confirm_intrusion_seconds}秒)") else: # 持续有人,检查是否达到确认时间 person_elapsed = (current_time - self._person_detected_in_clear_time).total_seconds() if person_elapsed >= self.confirm_intrusion_seconds: # 确认有人重新入侵,回到ALARMED self.state = self.STATE_ALARMED self.state_start_time = None self._person_detected_in_clear_time = None logger.info(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (确认有人重新入侵,持续{person_elapsed:.1f}秒)") else: # 没有人 self._person_detected_in_clear_time = None # 清除临时计时 # 检查是否达到消失确认时间 if elapsed >= self.confirm_clear_seconds: # 消失确认成功,发送resolve事件 if self._last_alarm_id and self._intrusion_start_time: duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000) alerts.append({ "alert_type": "alarm_resolve", "resolve_alarm_id": self._last_alarm_id, "duration_ms": duration_ms, "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), "resolve_type": "intrusion_cleared", }) logger.info(f"ROI {roi_id}: 告警已解决(入侵消失,持续无人{elapsed:.1f}秒)") # 重置状态 self.state = self.STATE_IDLE self.state_start_time = None self._last_alarm_id = None self._intrusion_start_time = None self._person_detected_in_clear_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)") return alerts def set_last_alarm_id(self, alarm_id: str): """由 main.py 在告警生成后回填 alarm_id""" self._last_alarm_id = alarm_id def reset(self): """重置算法状态""" self.state = self.STATE_IDLE self.state_start_time = None self._last_alarm_id = None self._intrusion_start_time = None self._person_detected_in_clear_time = None self.alert_cooldowns.clear() # 向后兼容 self.last_alert_time.clear() self.alert_triggered.clear() self.detection_start.clear() def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: """获取当前状态(用于调试和监控)""" current_time = current_time or datetime.now() state_info = { "state": self.state, "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, } # 添加状态特定信息 if self.state == self.STATE_ALARMED and self._intrusion_start_time: total_intrusion_sec = (current_time - self._intrusion_start_time).total_seconds() state_info["total_intrusion_sec"] = total_intrusion_sec state_info["alarm_id"] = self._last_alarm_id return state_info # class CrowdDetectionAlgorithm: # """人群聚集检测算法 - 暂时注释,后续需要时再启用""" # # def __init__( # self, # max_count: int = 10, # cooldown_seconds: int = 300, # target_class: Optional[str] = "person", # ): # self.max_count = max_count # self.cooldown_seconds = cooldown_seconds # self.target_class = target_class # # self.last_alert_time: Dict[str, datetime] = {} # self.alert_triggered: Dict[str, bool] = {} # # def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: # matched_rois = detection.get("matched_rois", []) # for roi in matched_rois: # if roi.get("roi_id") == roi_id: # return True # return False # # def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: # if not target_class: # return True # return detection.get("class") == target_class # # def _get_bboxes(self, tracks: List[Dict], roi_id: str) -> List[List[float]]: # bboxes = [] # for det in tracks: # if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): # bboxes.append(det.get("bbox", [])) # return bboxes # # def process( # self, # roi_id: str, # camera_id: str, # tracks: List[Dict], # current_time: Optional[datetime] = None, # ) -> List[Dict]: # current_time = current_time or datetime.now() # key = f"{camera_id}_{roi_id}" # # person_count = 0 # for det in tracks: # if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): # person_count += 1 # # if person_count <= self.max_count: # self.alert_triggered[key] = False # return [] # # if self.alert_triggered.get(key, False): # elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds() # if elapsed_since_alert < self.cooldown_seconds: # return [] # self.alert_triggered[key] = False # # bboxes = self._get_bboxes(tracks, roi_id) # self.last_alert_time[key] = current_time # self.alert_triggered[key] = True # # return [{ # "roi_id": roi_id, # "camera_id": camera_id, # "bbox": bboxes[0] if bboxes else [], # "alert_type": "crowd_detection", # "message": f"检测到人群聚集,当前人数: {person_count}", # "count": person_count, # }] # # def reset(self): # self.last_alert_time.clear() # self.alert_triggered.clear() class IllegalParkingAlgorithm: """ 车辆违停检测算法(状态机版本 v1.0) 状态机: IDLE → CONFIRMING_VEHICLE → PARKED_COUNTDOWN → ALARMED → CONFIRMING_CLEAR → IDLE 业务流程: 1. 检测到车辆进入禁停区 → 车辆确认期(confirm_vehicle_sec,默认15秒,ratio≥0.6) 2. 确认有车 → 违停倒计时(parking_countdown_sec,默认300秒/5分钟) 3. 倒计时结束仍有车 → 触发告警(ALARMED状态) 4. 车辆离开 → 消失确认期(confirm_clear_sec,默认30秒,ratio<0.2) 5. 确认车辆离开 → 发送resolve事件 → 回到空闲状态 使用滑动窗口(10秒)抗抖动,支持多类车辆检测。 """ # 状态定义 STATE_IDLE = "IDLE" STATE_CONFIRMING_VEHICLE = "CONFIRMING_VEHICLE" STATE_PARKED_COUNTDOWN = "PARKED_COUNTDOWN" STATE_ALARMED = "ALARMED" STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 告警级别常量(默认值,可通过 params 覆盖) DEFAULT_ALARM_LEVEL = 1 # 重要 # 滑动窗口参数 WINDOW_SIZE_SEC = 10 def __init__( self, confirm_vehicle_sec: int = 15, parking_countdown_sec: int = 300, confirm_clear_sec: int = 120, cooldown_sec: int = 1800, target_classes: Optional[List[str]] = None, alarm_level: Optional[int] = None, ): self.confirm_vehicle_sec = confirm_vehicle_sec self.parking_countdown_sec = parking_countdown_sec self.confirm_clear_sec = confirm_clear_sec self.cooldown_sec = cooldown_sec self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL # 状态变量 self.state: str = self.STATE_IDLE self.state_start_time: Optional[datetime] = None # 滑动窗口:存储 (timestamp, has_vehicle: bool) self._detection_window: deque = deque() # 告警追踪 self._last_alarm_id: Optional[str] = None self._parking_start_time: Optional[datetime] = None # 冷却期管理 self.alert_cooldowns: Dict[str, datetime] = {} def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: matched_rois = detection.get("matched_rois", []) for roi in matched_rois: if roi.get("roi_id") == roi_id: return True return False def _check_target_classes(self, detection: Dict) -> bool: """检查检测目标是否属于车辆类别""" det_class = detection.get("class", "") return det_class in self.target_classes def _update_window(self, current_time: datetime, has_vehicle: bool): """更新滑动窗口""" self._detection_window.append((current_time, has_vehicle)) cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC) while self._detection_window and self._detection_window[0][0] < cutoff: self._detection_window.popleft() def _get_window_ratio(self) -> float: """获取滑动窗口内的检测命中率""" if not self._detection_window: return 0.0 hits = sum(1 for _, has in self._detection_window if has) return hits / len(self._detection_window) def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: for det in tracks: if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): return det.get("bbox", []) return [] def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float: """获取ROI内车辆的最高置信度""" max_conf = 0.0 for det in tracks: if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): max_conf = max(max_conf, det.get("confidence", 0.0)) return max_conf def process( self, roi_id: str, camera_id: str, tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: """处理单帧检测结果""" current_time = current_time or datetime.now() alerts = [] # 检查ROI内是否有车辆 roi_has_vehicle = any( self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det) for det in tracks ) # 更新滑动窗口 self._update_window(current_time, roi_has_vehicle) ratio = self._get_window_ratio() # === 状态机处理 === if self.state == self.STATE_IDLE: if roi_has_vehicle: self.state = self.STATE_CONFIRMING_VEHICLE self.state_start_time = current_time logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_VEHICLE") elif self.state == self.STATE_CONFIRMING_VEHICLE: if self.state_start_time is None: self.state = self.STATE_IDLE return alerts elapsed = (current_time - self.state_start_time).total_seconds() if ratio < 0.3: # 命中率过低,车辆可能只是路过 self.state = self.STATE_IDLE self.state_start_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_VEHICLE → IDLE (ratio={ratio:.2f}<0.3)") elif elapsed >= self.confirm_vehicle_sec and ratio >= 0.6: # 确认有车辆停留,进入倒计时 self._parking_start_time = self.state_start_time self.state = self.STATE_PARKED_COUNTDOWN self.state_start_time = current_time logger.info(f"ROI {roi_id}: CONFIRMING_VEHICLE → PARKED_COUNTDOWN (ratio={ratio:.2f})") elif self.state == self.STATE_PARKED_COUNTDOWN: if self.state_start_time is None: self.state = self.STATE_IDLE return alerts elapsed = (current_time - self.state_start_time).total_seconds() if ratio < 0.2: # 车辆已离开 self.state = self.STATE_IDLE self.state_start_time = None self._parking_start_time = None logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (车辆离开, ratio={ratio:.2f})") elif elapsed >= self.parking_countdown_sec: # 倒计时结束,检查冷却期 cooldown_key = f"{camera_id}_{roi_id}" if cooldown_key not in self.alert_cooldowns or \ (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: bbox = self._get_latest_bbox(tracks, roi_id) confidence = self._get_max_confidence(tracks, roi_id) alerts.append({ "roi_id": roi_id, "camera_id": camera_id, "bbox": bbox, "alert_type": "illegal_parking", "alarm_level": self._alarm_level, "confidence": confidence, "message": f"检测到车辆违停(已停留{int(elapsed / 60)}分钟)", "first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None, "duration_minutes": elapsed / 60, }) self.alert_cooldowns[cooldown_key] = current_time self.state = self.STATE_ALARMED logger.warning(f"ROI {roi_id}: PARKED_COUNTDOWN → ALARMED (违停告警触发)") else: self.state = self.STATE_IDLE self.state_start_time = None self._parking_start_time = None logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (冷却期内)") elif self.state == self.STATE_ALARMED: if ratio < 0.15: self.state = self.STATE_CONFIRMING_CLEAR self.state_start_time = current_time logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR (ratio={ratio:.2f}<0.15)") elif self.state == self.STATE_CONFIRMING_CLEAR: if self.state_start_time is None: self.state = self.STATE_IDLE return alerts elapsed = (current_time - self.state_start_time).total_seconds() if ratio >= 0.5: # 车辆又出现,回到ALARMED self.state = self.STATE_ALARMED self.state_start_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (车辆仍在)") elif elapsed >= self.confirm_clear_sec and ratio < 0.2: # 确认车辆已离开 if self._last_alarm_id and self._parking_start_time: duration_ms = int((current_time - self._parking_start_time).total_seconds() * 1000) alerts.append({ "alert_type": "alarm_resolve", "resolve_alarm_id": self._last_alarm_id, "duration_ms": duration_ms, "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), "resolve_type": "vehicle_left", }) logger.info(f"ROI {roi_id}: 违停告警已解决(车辆离开)") self.state = self.STATE_IDLE self.state_start_time = None self._last_alarm_id = None self._parking_start_time = None self.alert_cooldowns.clear() # 车辆离开后清冷却,新车违停可正常告警 logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE") return alerts def set_last_alarm_id(self, alarm_id: str): """由 main.py 在告警生成后回填 alarm_id""" self._last_alarm_id = alarm_id def reset(self): """重置算法状态""" self.state = self.STATE_IDLE self.state_start_time = None self._last_alarm_id = None self._parking_start_time = None self._detection_window.clear() self.alert_cooldowns.clear() def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: """获取当前状态""" current_time = current_time or datetime.now() state_info = { "state": self.state, "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, "window_ratio": self._get_window_ratio(), } if self.state in (self.STATE_ALARMED, self.STATE_PARKED_COUNTDOWN) and self._parking_start_time: state_info["parking_duration_sec"] = (current_time - self._parking_start_time).total_seconds() state_info["alarm_id"] = self._last_alarm_id return state_info class VehicleCongestionAlgorithm: """ 车辆拥堵检测算法(状态机版本 v1.0) 状态机: NORMAL → CONFIRMING_CONGESTION → CONGESTED → CONFIRMING_CLEAR → NORMAL 业务流程: 1. 检测到车辆数量 ≥ count_threshold → 拥堵确认期(confirm_congestion_sec,默认60秒) 2. 确认拥堵(窗口内平均车辆数 ≥ threshold)→ 触发告警 3. 车辆减少 → 消散确认期(confirm_clear_sec,默认120秒) 4. 确认消散(平均数 < threshold)→ 发送resolve事件 → 回到正常 使用滑动窗口(10秒)存储车辆计数,取平均值判断。 """ # 状态定义 STATE_NORMAL = "NORMAL" STATE_CONFIRMING_CONGESTION = "CONFIRMING_CONGESTION" STATE_CONGESTED = "CONGESTED" STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 告警级别(默认值,可通过 params 覆盖) DEFAULT_ALARM_LEVEL = 2 # 普通 # 滑动窗口参数 WINDOW_SIZE_SEC = 10 def __init__( self, count_threshold: int = 5, confirm_congestion_sec: int = 60, confirm_clear_sec: int = 180, cooldown_sec: int = 1800, target_classes: Optional[List[str]] = None, alarm_level: Optional[int] = None, ): self.count_threshold = count_threshold self.confirm_congestion_sec = confirm_congestion_sec self.confirm_clear_sec = confirm_clear_sec self.cooldown_sec = cooldown_sec self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL # 状态变量 self.state: str = self.STATE_NORMAL self.state_start_time: Optional[datetime] = None # 滑动窗口:存储 (timestamp, vehicle_count: int) self._count_window: deque = deque() # 告警追踪 self._last_alarm_id: Optional[str] = None self._congestion_start_time: Optional[datetime] = None # 冷却期管理 self.alert_cooldowns: Dict[str, datetime] = {} def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: matched_rois = detection.get("matched_rois", []) for roi in matched_rois: if roi.get("roi_id") == roi_id: return True return False def _check_target_classes(self, detection: Dict) -> bool: det_class = detection.get("class", "") return det_class in self.target_classes def _count_vehicles_in_roi(self, tracks: List[Dict], roi_id: str) -> int: """统计ROI内的车辆数量""" return sum( 1 for det in tracks if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det) ) def _update_count_window(self, current_time: datetime, count: int): """更新车辆计数滑动窗口""" self._count_window.append((current_time, count)) cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC) while self._count_window and self._count_window[0][0] < cutoff: self._count_window.popleft() def _get_avg_count(self) -> float: """获取滑动窗口内的平均车辆数""" if not self._count_window: return 0.0 total = sum(c for _, c in self._count_window) return total / len(self._count_window) def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float: max_conf = 0.0 for det in tracks: if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): max_conf = max(max_conf, det.get("confidence", 0.0)) return max_conf def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: for det in tracks: if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): return det.get("bbox", []) return [] def process( self, roi_id: str, camera_id: str, tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: """处理单帧检测结果""" current_time = current_time or datetime.now() alerts = [] # 统计ROI内车辆数 vehicle_count = self._count_vehicles_in_roi(tracks, roi_id) self._update_count_window(current_time, vehicle_count) avg_count = self._get_avg_count() # === 状态机处理 === if self.state == self.STATE_NORMAL: if avg_count >= self.count_threshold: self.state = self.STATE_CONFIRMING_CONGESTION self.state_start_time = current_time logger.debug(f"ROI {roi_id}: NORMAL → CONFIRMING_CONGESTION (avg={avg_count:.1f}≥{self.count_threshold})") elif self.state == self.STATE_CONFIRMING_CONGESTION: if self.state_start_time is None: self.state = self.STATE_NORMAL return alerts elapsed = (current_time - self.state_start_time).total_seconds() if avg_count < self.count_threshold: # 车辆减少,回到正常 self.state = self.STATE_NORMAL self.state_start_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (avg={avg_count:.1f}<{self.count_threshold})") elif elapsed >= self.confirm_congestion_sec: # 确认拥堵,检查冷却期 cooldown_key = f"{camera_id}_{roi_id}" if cooldown_key not in self.alert_cooldowns or \ (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: self._congestion_start_time = self.state_start_time bbox = self._get_latest_bbox(tracks, roi_id) confidence = self._get_max_confidence(tracks, roi_id) alerts.append({ "roi_id": roi_id, "camera_id": camera_id, "bbox": bbox, "alert_type": "vehicle_congestion", "alarm_level": self._alarm_level, "confidence": confidence, "message": f"检测到车辆拥堵(平均{avg_count:.0f}辆,持续{int(elapsed)}秒)", "first_frame_time": self._congestion_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._congestion_start_time else None, "vehicle_count": int(avg_count), }) self.alert_cooldowns[cooldown_key] = current_time self.state = self.STATE_CONGESTED logger.warning(f"ROI {roi_id}: CONFIRMING_CONGESTION → CONGESTED (拥堵告警触发, avg={avg_count:.1f})") else: self.state = self.STATE_NORMAL self.state_start_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (冷却期内)") elif self.state == self.STATE_CONGESTED: # 车辆数降到阈值的一半以下才开始确认消散(避免抖动) if avg_count < self.count_threshold * 0.5: self.state = self.STATE_CONFIRMING_CLEAR self.state_start_time = current_time logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{self.count_threshold * 0.5:.1f})") elif self.state == self.STATE_CONFIRMING_CLEAR: if self.state_start_time is None: self.state = self.STATE_NORMAL return alerts elapsed = (current_time - self.state_start_time).total_seconds() if avg_count >= self.count_threshold: # 又拥堵了,回到CONGESTED self.state = self.STATE_CONGESTED self.state_start_time = None logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → CONGESTED (avg={avg_count:.1f}≥{self.count_threshold})") elif elapsed >= self.confirm_clear_sec: # 确认消散 if self._last_alarm_id and self._congestion_start_time: duration_ms = int((current_time - self._congestion_start_time).total_seconds() * 1000) alerts.append({ "alert_type": "alarm_resolve", "resolve_alarm_id": self._last_alarm_id, "duration_ms": duration_ms, "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), "resolve_type": "congestion_cleared", }) logger.info(f"ROI {roi_id}: 拥堵告警已解决(车辆减少)") self.state = self.STATE_NORMAL self.state_start_time = None self._last_alarm_id = None self._congestion_start_time = None self.alert_cooldowns.clear() # 拥堵消散后清冷却,再次拥堵可正常告警 logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → NORMAL") return alerts def set_last_alarm_id(self, alarm_id: str): """由 main.py 在告警生成后回填 alarm_id""" self._last_alarm_id = alarm_id def reset(self): """重置算法状态""" self.state = self.STATE_NORMAL self.state_start_time = None self._last_alarm_id = None self._congestion_start_time = None self._count_window.clear() self.alert_cooldowns.clear() def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: """获取当前状态""" current_time = current_time or datetime.now() state_info = { "state": self.state, "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, "avg_vehicle_count": self._get_avg_count(), } if self.state in (self.STATE_CONGESTED, self.STATE_CONFIRMING_CLEAR) and self._congestion_start_time: state_info["congestion_duration_sec"] = (current_time - self._congestion_start_time).total_seconds() state_info["alarm_id"] = self._last_alarm_id return state_info class AlgorithmManager: def __init__(self, working_hours: Optional[List[Dict]] = None): self.algorithms: Dict[str, Dict[str, Any]] = {} self.working_hours = working_hours or [] self._update_lock = threading.Lock() self._registered_keys: set = set() # 已注册的 (roi_id, bind_id, algo_type) 缓存 self.default_params = { "leave_post": { "confirm_on_duty_sec": 10, "confirm_leave_sec": 30, "cooldown_sec": 600, "target_class": "person", }, "intrusion": { "cooldown_seconds": 300, "confirm_seconds": 5, "target_class": None, }, "illegal_parking": { "confirm_vehicle_sec": 15, "parking_countdown_sec": 300, "confirm_clear_sec": 30, "cooldown_sec": 600, "target_classes": ["car", "truck", "bus", "motorcycle"], }, "vehicle_congestion": { "count_threshold": 3, "confirm_congestion_sec": 60, "confirm_clear_sec": 120, "cooldown_sec": 600, "target_classes": ["car", "truck", "bus", "motorcycle"], }, } self._pubsub = None self._pubsub_thread = None self._running = False def start_config_subscription(self): """启动配置变更订阅""" try: from config.settings import get_settings settings = get_settings() if settings.config_sync_mode != "REDIS": logger.info("CONFIG_SYNC_MODE=LOCAL: 跳过 Redis 配置订阅") return redis_client = redis.Redis( host=settings.redis.host, port=settings.redis.port, db=settings.redis.db, password=settings.redis.password, decode_responses=True, ) self._pubsub = redis_client.pubsub() self._pubsub.subscribe("config_update") self._running = True self._pubsub_thread = threading.Thread( target=self._config_update_worker, name="ConfigUpdateSub", daemon=True ) self._pubsub_thread.start() logger.info("已启动配置变更订阅") except Exception as e: logger.error(f"启动配置订阅失败: {e}") def _config_update_worker(self): """配置更新订阅工作线程""" try: for message in self._pubsub.listen(): if not self._running: break if message["type"] == "message": try: import json data = json.loads(message["data"]) update_type = data.get("type", "full") if update_type == "roi": roi_ids = data.get("ids", []) if roi_ids: for roi_id in roi_ids: self.reload_algorithm(roi_id) else: self.reload_all_algorithms() elif update_type == "bind": bind_ids = data.get("ids", []) if bind_ids: for bind_id in bind_ids: self.reload_bind_algorithm(bind_id) else: self.reload_all_algorithms() else: # type="full" / "camera" / unknown → 全量重载 self.reload_all_algorithms() except Exception as e: logger.error(f"处理配置更新消息失败: {e}") except Exception as e: logger.error(f"配置订阅线程异常: {e}") def stop_config_subscription(self): """停止配置变更订阅""" self._running = False if self._pubsub: self._pubsub.close() if self._pubsub_thread and self._pubsub_thread.is_alive(): self._pubsub_thread.join(timeout=5) logger.info("配置订阅已停止") def load_bind_from_redis(self, bind_id: str) -> bool: """从Redis加载单个绑定配置的算法""" try: from core.config_sync import get_config_sync_manager config_manager = get_config_sync_manager() bind_config = config_manager.get_algo_bind_from_redis(bind_id) if not bind_config: return False with self._update_lock: roi_id = bind_config.get("roi_id") algo_code = bind_config.get("algo_code", "leave_post") raw_params = bind_config.get("params") if isinstance(raw_params, str): try: import json params = json.loads(raw_params) or {} except Exception: params = {} elif isinstance(raw_params, dict): params = raw_params else: params = {} if roi_id not in self.algorithms: self.algorithms[roi_id] = {} key = f"{roi_id}_{bind_id}" if algo_code == "leave_post": configured_alarm_level = params.get("alarm_level") algo_params = { "confirm_on_duty_sec": params.get("confirm_on_duty_sec", 10), "confirm_leave_sec": params.get("confirm_leave_sec", 30), "cooldown_sec": params.get("cooldown_sec", 600), "working_hours": params.get("working_hours", []), "target_class": params.get("target_class", bind_config.get("target_class", "person")), } if key in self.algorithms.get(roi_id, {}) and "leave_post" in self.algorithms[roi_id].get(key, {}): algo = self.algorithms[roi_id][key]["leave_post"] algo.confirm_on_duty_sec = algo_params["confirm_on_duty_sec"] algo.confirm_leave_sec = algo_params["confirm_leave_sec"] algo.cooldown_sec = algo_params["cooldown_sec"] algo.target_class = algo_params["target_class"] if configured_alarm_level is not None: algo._alarm_level = configured_alarm_level if algo_params["working_hours"]: algo.working_hours = algo_params["working_hours"] logger.info(f"已热更新算法参数: {key}") else: self.algorithms[roi_id][key] = {} self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm( confirm_on_duty_sec=algo_params["confirm_on_duty_sec"], confirm_leave_sec=algo_params["confirm_leave_sec"], leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), cooldown_sec=algo_params["cooldown_sec"], working_hours=algo_params["working_hours"], target_class=algo_params["target_class"], alarm_level=configured_alarm_level, ) logger.info(f"已从Redis加载算法: {key}") elif algo_code == "intrusion": configured_alarm_level = params.get("alarm_level") algo_params = { "cooldown_seconds": params.get("cooldown_seconds", 300), "confirm_seconds": params.get("confirm_seconds", 5), "target_class": params.get("target_class", bind_config.get("target_class")), } self.algorithms[roi_id][key] = {} self.algorithms[roi_id][key]["intrusion"] = IntrusionAlgorithm( cooldown_seconds=algo_params["cooldown_seconds"], confirm_seconds=algo_params["confirm_seconds"], target_class=algo_params["target_class"], alarm_level=configured_alarm_level, ) logger.info(f"已从Redis加载算法: {key}") return True except Exception as e: logger.error(f"从Redis加载算法配置失败: {e}") return False def reload_bind_algorithm(self, bind_id: str) -> bool: """重新加载单个绑定的算法配置""" return self.load_bind_from_redis(bind_id) def reload_algorithm(self, roi_id: str) -> bool: """重新加载单个ROI的所有算法绑定配置""" try: from core.config_sync import get_config_sync_manager config_manager = get_config_sync_manager() bindings = config_manager.get_bindings_from_redis(roi_id) if not bindings: return False for bind in bindings: bind_id = bind.get("bind_id") self.reset_algorithm(roi_id, bind_id) self.load_bind_from_redis(bind_id) return True except Exception as e: logger.error(f"重新加载ROI算法配置失败: {e}") return False def update_algorithm_params(self, roi_id: str, bind_id: str, bind_config: dict) -> bool: """仅更新算法参数,保留状态机 Args: roi_id: ROI ID bind_id: 绑定ID bind_config: 绑定配置字典(包含algo_code和params) Returns: 是否成功更新 """ try: import json key = f"{roi_id}_{bind_id}" # 算法实例不存在,创建新的 if roi_id not in self.algorithms or key not in self.algorithms[roi_id]: return self.load_bind_from_redis(bind_id) # 提取参数 params_str = bind_config.get("params", "{}") params = json.loads(params_str) if isinstance(params_str, str) else params_str algo_code = bind_config.get("algo_code") # 获取现有算法实例 existing_algo = self.algorithms[roi_id][key].get(algo_code) if existing_algo is None: # 算法类型不匹配,重新创建 return self.load_bind_from_redis(bind_id) # 更新参数(根据算法类型调用不同的更新方法) if algo_code == "leave_post": # 更新离岗检测参数 leave_countdown_sec = params.get("leave_countdown_sec", 300) working_hours = params.get("working_hours", []) confirm_on_duty_sec = params.get("confirm_on_duty_sec", 10) confirm_leave_sec = params.get("confirm_leave_sec", 30) cooldown_sec = params.get("cooldown_sec", 600) target_class = params.get("target_class", "person") existing_algo.leave_countdown_sec = leave_countdown_sec existing_algo.working_hours = working_hours existing_algo.confirm_on_duty_sec = confirm_on_duty_sec existing_algo.confirm_leave_sec = confirm_leave_sec existing_algo.cooldown_sec = cooldown_sec existing_algo.target_class = target_class logger.info(f"[{roi_id}_{bind_id}] 更新离岗检测参数: countdown={leave_countdown_sec}s, hours={len(working_hours)}") elif algo_code == "intrusion": # 更新周界入侵参数 confirm_intrusion_sec = params.get("confirm_intrusion_seconds") or params.get("confirm_seconds", 5) confirm_clear_sec = params.get("confirm_clear_seconds", 180) cooldown_sec = params.get("cooldown_seconds", 300) target_class = params.get("target_class") existing_algo.confirm_intrusion_seconds = confirm_intrusion_sec existing_algo.confirm_clear_seconds = confirm_clear_sec existing_algo.cooldown_seconds = cooldown_sec if target_class is not None: existing_algo.target_class = target_class logger.info(f"[{roi_id}_{bind_id}] 更新周界入侵参数: intrusion={confirm_intrusion_sec}s, clear={confirm_clear_sec}s") # 其他算法类型可以在此添加 return True except Exception as e: logger.error(f"更新算法参数失败 {roi_id}_{bind_id}: {e}") return False def reload_all_algorithms(self, preserve_state: bool = True) -> int: """重新加载所有算法配置 Args: preserve_state: 是否保留算法状态(默认True) True - 仅更新参数,保留状态机(用于配置更新) False - 完全重置(用于手动重启) Returns: 成功加载的算法数量 """ count = 0 try: from core.config_sync import get_config_sync_manager config_manager = get_config_sync_manager() bindings = config_manager.get_bindings_from_redis("") # 收集当前配置中有效的 (roi_id, bind_id) 组合 valid_keys = set() for bind in bindings: bind_id = bind.get("bind_id") roi_id = bind.get("roi_id") valid_keys.add((roi_id, bind_id)) if preserve_state: # 仅更新参数,不重置状态 if self.update_algorithm_params(roi_id, bind_id, bind): count += 1 else: # 完全重置 self.reset_algorithm(roi_id, bind_id) if self.load_bind_from_redis(bind_id): count += 1 # 清理内存中已被删除的算法实例 removed_count = 0 with self._update_lock: for roi_id in list(self.algorithms.keys()): for key in list(self.algorithms[roi_id].keys()): # key 格式: "roi_id_bind_id" if key.startswith(roi_id + "_"): bind_id = key[len(roi_id) + 1:] if (roi_id, bind_id) not in valid_keys: for algo in self.algorithms[roi_id][key].values(): algo.reset() del self.algorithms[roi_id][key] # 清除注册缓存 self._registered_keys = { k for k in self._registered_keys if not (k[0] == roi_id and k[1] == bind_id) } removed_count += 1 logger.info(f"清理已删除的算法实例: {key}") # 如果 roi 下已无算法实例,清理空字典 if not self.algorithms[roi_id]: del self.algorithms[roi_id] if removed_count > 0: logger.info(f"已清理 {removed_count} 个孤立算法实例") logger.info(f"已重新加载 {count} 个算法配置 (preserve_state={preserve_state})") return count except Exception as e: logger.error(f"重新加载所有算法配置失败: {e}") return count def register_algorithm( self, roi_id: str, bind_id: str, algorithm_type: str, params: Optional[Dict[str, Any]] = None, ): """注册算法(支持绑定ID),使用缓存避免每帧重复查询""" cache_key = (roi_id, bind_id, algorithm_type) # 快速路径:已注册直接返回 if cache_key in self._registered_keys: return key = f"{roi_id}_{bind_id}" if roi_id not in self.algorithms: self.algorithms[roi_id] = {} if key not in self.algorithms[roi_id]: self.algorithms[roi_id][key] = {} algo_params = self.default_params.get(algorithm_type, {}).copy() if params: algo_params.update(params) # 从 params 中提取告警等级(前端配置下发) configured_alarm_level = algo_params.get("alarm_level") if algorithm_type == "leave_post": roi_working_hours = algo_params.get("working_hours") or self.working_hours self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm( confirm_on_duty_sec=algo_params.get("confirm_on_duty_sec", 10), confirm_leave_sec=algo_params.get("confirm_leave_sec", 30), leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时,默认5分钟 cooldown_sec=algo_params.get("cooldown_sec", 600), working_hours=roi_working_hours, target_class=algo_params.get("target_class", "person"), alarm_level=configured_alarm_level, ) elif algorithm_type == "intrusion": self.algorithms[roi_id][key]["intrusion"] = IntrusionAlgorithm( cooldown_seconds=algo_params.get("cooldown_seconds", 300), confirm_seconds=algo_params.get("confirm_seconds", 5), target_class=algo_params.get("target_class"), alarm_level=configured_alarm_level, ) elif algorithm_type == "illegal_parking": self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm( confirm_vehicle_sec=algo_params.get("confirm_vehicle_sec", 15), parking_countdown_sec=algo_params.get("parking_countdown_sec", 300), confirm_clear_sec=algo_params.get("confirm_clear_sec", 30), cooldown_sec=algo_params.get("cooldown_sec", 600), target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), alarm_level=configured_alarm_level, ) elif algorithm_type == "vehicle_congestion": self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm( count_threshold=algo_params.get("count_threshold", 3), confirm_congestion_sec=algo_params.get("confirm_congestion_sec", 60), confirm_clear_sec=algo_params.get("confirm_clear_sec", 120), cooldown_sec=algo_params.get("cooldown_sec", 600), target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), alarm_level=configured_alarm_level, ) self._registered_keys.add(cache_key) def process( self, roi_id: str, bind_id: str, camera_id: str, algorithm_type: str, tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: """处理检测结果(支持绑定ID)""" key = f"{roi_id}_{bind_id}" algo = self.algorithms.get(roi_id, {}).get(key, {}).get(algorithm_type) if algo is None: return [] return algo.process(roi_id, camera_id, tracks, current_time) def update_roi_params( self, roi_id: str, bind_id: str, algorithm_type: str, params: Dict[str, Any], ): """更新算法参数(支持绑定ID)""" key = f"{roi_id}_{bind_id}" if roi_id in self.algorithms and key in self.algorithms[roi_id] and algorithm_type in self.algorithms[roi_id][key]: algo = self.algorithms[roi_id][key][algorithm_type] for param_key, value in params.items(): if hasattr(algo, param_key): setattr(algo, param_key, value) def reset_algorithm(self, roi_id: str, bind_id: Optional[str] = None): """重置算法状态(支持绑定ID)""" if roi_id not in self.algorithms: return if bind_id: key = f"{roi_id}_{bind_id}" if key in self.algorithms[roi_id]: for algo in self.algorithms[roi_id][key].values(): algo.reset() # 清除注册缓存 self._registered_keys = { k for k in self._registered_keys if not (k[0] == roi_id and k[1] == bind_id) } else: for key in self.algorithms[roi_id]: for algo in self.algorithms[roi_id][key].values(): algo.reset() # 清除该 roi 的所有注册缓存 self._registered_keys = { k for k in self._registered_keys if k[0] != roi_id } def reset_all(self): """重置所有算法""" for roi_algorithms in self.algorithms.values(): for bind_algorithms in roi_algorithms.values(): for algo in bind_algorithms.values(): algo.reset() def remove_roi(self, roi_id: str): """移除ROI的所有算法""" if roi_id in self.algorithms: for key in list(self.algorithms[roi_id].keys()): self.reset_algorithm(roi_id, key.split("_")[-1] if "_" in key else None) del self.algorithms[roi_id] def remove_bind(self, roi_id: str, bind_id: str): """移除绑定的算法""" key = f"{roi_id}_{bind_id}" if roi_id in self.algorithms and key in self.algorithms[roi_id]: for algo in self.algorithms[roi_id][key].values(): algo.reset() del self.algorithms[roi_id][key] def get_status(self, roi_id: str) -> Dict[str, Any]: """获取算法状态""" status = {} if roi_id in self.algorithms: for key, bind_algorithms in self.algorithms[roi_id].items(): bind_id = key.split("_", 1)[-1] if "_" in key else "" for algo_type, algo in bind_algorithms.items(): if algo_type == "leave_post": status[f"{algo_type}_{bind_id}"] = { "state": getattr(algo, "state", "WAITING"), "alarm_sent": getattr(algo, "alarm_sent", False), } elif algo_type in ("illegal_parking", "vehicle_congestion"): status[f"{algo_type}_{bind_id}"] = algo.get_state() else: status[f"{algo_type}_{bind_id}"] = { "detection_count": len(getattr(algo, "detection_start", {})), } return status