chore: 注释掉人群聚集检测算法

- 注释 CrowdDetectionAlgorithm 类
- 注释 AlgorithmManager 中的 crowd_detection 相关代码
- 保留代码以便后续需要时启用

当前仅保留2个算法:leave_post、intrusion

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-05 15:16:47 +08:00
parent e4605e8702
commit 3065bb948e

View File

@@ -306,82 +306,82 @@ class IntrusionAlgorithm:
self.detection_start.clear()
class CrowdDetectionAlgorithm:
"""人群聚集检测算法"""
def __init__(
self,
max_count: int = 10,
cooldown_seconds: int = 300,
target_class: Optional[str] = "person",
):
self.max_count = max_count
self.cooldown_seconds = cooldown_seconds
self.target_class = target_class
self.last_alert_time: Dict[str, datetime] = {}
self.alert_triggered: Dict[str, bool] = {}
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool:
if not target_class:
return True
return detection.get("class") == target_class
def _get_bboxes(self, tracks: List[Dict], roi_id: str) -> List[List[float]]:
bboxes = []
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
bboxes.append(det.get("bbox", []))
return bboxes
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
current_time = current_time or datetime.now()
key = f"{camera_id}_{roi_id}"
person_count = 0
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
person_count += 1
if person_count <= self.max_count:
self.alert_triggered[key] = False
return []
if self.alert_triggered.get(key, False):
elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds()
if elapsed_since_alert < self.cooldown_seconds:
return []
self.alert_triggered[key] = False
bboxes = self._get_bboxes(tracks, roi_id)
self.last_alert_time[key] = current_time
self.alert_triggered[key] = True
return [{
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bboxes[0] if bboxes else [],
"alert_type": "crowd_detection",
"message": f"检测到人群聚集,当前人数: {person_count}",
"count": person_count,
}]
def reset(self):
self.last_alert_time.clear()
self.alert_triggered.clear()
# class CrowdDetectionAlgorithm:
# """人群聚集检测算法 - 暂时注释,后续需要时再启用"""
#
# def __init__(
# self,
# max_count: int = 10,
# cooldown_seconds: int = 300,
# target_class: Optional[str] = "person",
# ):
# self.max_count = max_count
# self.cooldown_seconds = cooldown_seconds
# self.target_class = target_class
#
# self.last_alert_time: Dict[str, datetime] = {}
# self.alert_triggered: Dict[str, bool] = {}
#
# def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
# matched_rois = detection.get("matched_rois", [])
# for roi in matched_rois:
# if roi.get("roi_id") == roi_id:
# return True
# return False
#
# def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool:
# if not target_class:
# return True
# return detection.get("class") == target_class
#
# def _get_bboxes(self, tracks: List[Dict], roi_id: str) -> List[List[float]]:
# bboxes = []
# for det in tracks:
# if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
# bboxes.append(det.get("bbox", []))
# return bboxes
#
# def process(
# self,
# roi_id: str,
# camera_id: str,
# tracks: List[Dict],
# current_time: Optional[datetime] = None,
# ) -> List[Dict]:
# current_time = current_time or datetime.now()
# key = f"{camera_id}_{roi_id}"
#
# person_count = 0
# for det in tracks:
# if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
# person_count += 1
#
# if person_count <= self.max_count:
# self.alert_triggered[key] = False
# return []
#
# if self.alert_triggered.get(key, False):
# elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds()
# if elapsed_since_alert < self.cooldown_seconds:
# return []
# self.alert_triggered[key] = False
#
# bboxes = self._get_bboxes(tracks, roi_id)
# self.last_alert_time[key] = current_time
# self.alert_triggered[key] = True
#
# return [{
# "roi_id": roi_id,
# "camera_id": camera_id,
# "bbox": bboxes[0] if bboxes else [],
# "alert_type": "crowd_detection",
# "message": f"检测到人群聚集,当前人数: {person_count}",
# "count": person_count,
# }]
#
# def reset(self):
# self.last_alert_time.clear()
# self.alert_triggered.clear()
class AlgorithmManager:
@@ -403,11 +403,11 @@ class AlgorithmManager:
"confirm_seconds": 5,
"target_class": None,
},
"crowd_detection": {
"max_count": 10,
"cooldown_seconds": 300,
"target_class": "person",
},
# "crowd_detection": {
# "max_count": 10,
# "cooldown_seconds": 300,
# "target_class": "person",
# },
}
self._pubsub = None
@@ -635,13 +635,13 @@ class AlgorithmManager:
confirm_seconds=algo_params.get("confirm_seconds", 5),
target_class=algo_params.get("target_class"),
)
elif algorithm_type == "crowd_detection":
from algorithms import CrowdDetectionAlgorithm
self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm(
max_count=algo_params.get("max_count", 10),
cooldown_seconds=algo_params.get("cooldown_seconds", 300),
target_class=algo_params.get("target_class", "person"),
)
# elif algorithm_type == "crowd_detection":
# from algorithms import CrowdDetectionAlgorithm
# self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm(
# max_count=algo_params.get("max_count", 10),
# cooldown_seconds=algo_params.get("cooldown_seconds", 300),
# target_class=algo_params.get("target_class", "person"),
# )
self._registered_keys.add(cache_key)