From 3065bb948e8cef3fe3c86d8896d913823f5a486e Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 5 Feb 2026 15:16:47 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E6=B3=A8=E9=87=8A=E6=8E=89=E4=BA=BA?= =?UTF-8?q?=E7=BE=A4=E8=81=9A=E9=9B=86=E6=A3=80=E6=B5=8B=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 注释 CrowdDetectionAlgorithm 类 - 注释 AlgorithmManager 中的 crowd_detection 相关代码 - 保留代码以便后续需要时启用 当前仅保留2个算法:leave_post、intrusion Co-Authored-By: Claude Opus 4.5 --- algorithms.py | 176 +++++++++++++++++++++++++------------------------- 1 file changed, 88 insertions(+), 88 deletions(-) diff --git a/algorithms.py b/algorithms.py index 5f5b68b..b9afe3e 100644 --- a/algorithms.py +++ b/algorithms.py @@ -306,82 +306,82 @@ class IntrusionAlgorithm: self.detection_start.clear() -class CrowdDetectionAlgorithm: - """人群聚集检测算法""" - - def __init__( - self, - max_count: int = 10, - cooldown_seconds: int = 300, - target_class: Optional[str] = "person", - ): - self.max_count = max_count - self.cooldown_seconds = cooldown_seconds - self.target_class = target_class - - self.last_alert_time: Dict[str, datetime] = {} - self.alert_triggered: Dict[str, bool] = {} - - def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: - matched_rois = detection.get("matched_rois", []) - for roi in matched_rois: - if roi.get("roi_id") == roi_id: - return True - return False - - def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: - if not target_class: - return True - return detection.get("class") == target_class - - def _get_bboxes(self, tracks: List[Dict], roi_id: str) -> List[List[float]]: - bboxes = [] - for det in tracks: - if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): - bboxes.append(det.get("bbox", [])) - return bboxes - - def process( - self, - roi_id: str, - camera_id: str, - tracks: List[Dict], - current_time: Optional[datetime] = None, - ) -> List[Dict]: - current_time = current_time or datetime.now() - key = f"{camera_id}_{roi_id}" - - person_count = 0 - for det in tracks: - if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): - person_count += 1 - - if person_count <= self.max_count: - self.alert_triggered[key] = False - return [] - - if self.alert_triggered.get(key, False): - elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds() - if elapsed_since_alert < self.cooldown_seconds: - return [] - self.alert_triggered[key] = False - - bboxes = self._get_bboxes(tracks, roi_id) - self.last_alert_time[key] = current_time - self.alert_triggered[key] = True - - return [{ - "roi_id": roi_id, - "camera_id": camera_id, - "bbox": bboxes[0] if bboxes else [], - "alert_type": "crowd_detection", - "message": f"检测到人群聚集,当前人数: {person_count}", - "count": person_count, - }] - - def reset(self): - self.last_alert_time.clear() - self.alert_triggered.clear() +# class CrowdDetectionAlgorithm: +# """人群聚集检测算法 - 暂时注释,后续需要时再启用""" +# +# def __init__( +# self, +# max_count: int = 10, +# cooldown_seconds: int = 300, +# target_class: Optional[str] = "person", +# ): +# self.max_count = max_count +# self.cooldown_seconds = cooldown_seconds +# self.target_class = target_class +# +# self.last_alert_time: Dict[str, datetime] = {} +# self.alert_triggered: Dict[str, bool] = {} +# +# def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: +# matched_rois = detection.get("matched_rois", []) +# for roi in matched_rois: +# if roi.get("roi_id") == roi_id: +# return True +# return False +# +# def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: +# if not target_class: +# return True +# return detection.get("class") == target_class +# +# def _get_bboxes(self, tracks: List[Dict], roi_id: str) -> List[List[float]]: +# bboxes = [] +# for det in tracks: +# if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): +# bboxes.append(det.get("bbox", [])) +# return bboxes +# +# def process( +# self, +# roi_id: str, +# camera_id: str, +# tracks: List[Dict], +# current_time: Optional[datetime] = None, +# ) -> List[Dict]: +# current_time = current_time or datetime.now() +# key = f"{camera_id}_{roi_id}" +# +# person_count = 0 +# for det in tracks: +# if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): +# person_count += 1 +# +# if person_count <= self.max_count: +# self.alert_triggered[key] = False +# return [] +# +# if self.alert_triggered.get(key, False): +# elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds() +# if elapsed_since_alert < self.cooldown_seconds: +# return [] +# self.alert_triggered[key] = False +# +# bboxes = self._get_bboxes(tracks, roi_id) +# self.last_alert_time[key] = current_time +# self.alert_triggered[key] = True +# +# return [{ +# "roi_id": roi_id, +# "camera_id": camera_id, +# "bbox": bboxes[0] if bboxes else [], +# "alert_type": "crowd_detection", +# "message": f"检测到人群聚集,当前人数: {person_count}", +# "count": person_count, +# }] +# +# def reset(self): +# self.last_alert_time.clear() +# self.alert_triggered.clear() class AlgorithmManager: @@ -403,11 +403,11 @@ class AlgorithmManager: "confirm_seconds": 5, "target_class": None, }, - "crowd_detection": { - "max_count": 10, - "cooldown_seconds": 300, - "target_class": "person", - }, + # "crowd_detection": { + # "max_count": 10, + # "cooldown_seconds": 300, + # "target_class": "person", + # }, } self._pubsub = None @@ -635,13 +635,13 @@ class AlgorithmManager: confirm_seconds=algo_params.get("confirm_seconds", 5), target_class=algo_params.get("target_class"), ) - elif algorithm_type == "crowd_detection": - from algorithms import CrowdDetectionAlgorithm - self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm( - max_count=algo_params.get("max_count", 10), - cooldown_seconds=algo_params.get("cooldown_seconds", 300), - target_class=algo_params.get("target_class", "person"), - ) + # elif algorithm_type == "crowd_detection": + # from algorithms import CrowdDetectionAlgorithm + # self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm( + # max_count=algo_params.get("max_count", 10), + # cooldown_seconds=algo_params.get("cooldown_seconds", 300), + # target_class=algo_params.get("target_class", "person"), + # ) self._registered_keys.add(cache_key)