diff --git a/algorithms.py b/algorithms.py index ca668e8..f2a7c30 100644 --- a/algorithms.py +++ b/algorithms.py @@ -12,56 +12,73 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) class LeavePostAlgorithm: + STATE_WAITING = "WAITING" STATE_ON_DUTY = "ON_DUTY" - STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN" + STATE_LEAVING = "LEAVING" + STATE_OFF_DUTY = "OFF_DUTY" STATE_NON_WORK_TIME = "NON_WORK_TIME" - STATE_INIT = "INIT" def __init__( self, - threshold_sec: int = 300, - confirm_sec: int = 10, - return_sec: int = 30, + confirm_on_duty_sec: int = 10, + confirm_leave_sec: int = 10, + cooldown_sec: int = 300, working_hours: Optional[List[Dict]] = None, + target_class: Optional[str] = "person", ): - self.threshold_sec = threshold_sec - self.confirm_sec = confirm_sec - self.return_sec = return_sec + self.confirm_on_duty_sec = confirm_on_duty_sec + self.confirm_leave_sec = confirm_leave_sec + self.cooldown_sec = cooldown_sec self.working_hours = working_hours or [] + self.target_class = target_class self.alert_cooldowns: Dict[str, datetime] = {} - self.cooldown_seconds = 300 - self.state: str = self.STATE_INIT + self.state: str = self.STATE_WAITING self.state_start_time: Optional[datetime] = None - self.on_duty_window = deque() - self.alarm_sent: bool = False - self.last_person_seen_time: Optional[datetime] = None - self.last_detection_time: Optional[datetime] = None - self.init_start_time: Optional[datetime] = None + self.detection_history: deque = deque() - def is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: + self.alarm_sent: bool = False + self.last_person_time: Optional[datetime] = None + + def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: if not self.working_hours: return True - dt = dt or datetime.now() current_minutes = dt.hour * 60 + dt.minute - for period in self.working_hours: start_minutes = period["start"][0] * 60 + period["start"][1] end_minutes = period["end"][0] * 60 + period["end"][1] if start_minutes <= current_minutes < end_minutes: return True - return False - def check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: + def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: matched_rois = detection.get("matched_rois", []) for roi in matched_rois: if roi.get("roi_id") == roi_id: return True return False + def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: + if not target_class: + return True + return detection.get("class") == target_class + + def _get_detection_window(self, current_time: datetime) -> List[bool]: + detections = [] + while self.detection_history and (current_time - self.detection_history[0][0]).total_seconds() > max(self.confirm_on_duty_sec, self.confirm_leave_sec): + self.detection_history.popleft() + for _, has_person in self.detection_history: + detections.append(has_person) + return detections + + def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: + for det in tracks: + if self._check_detection_in_roi(det, roi_id): + return det.get("bbox", []) + return [] + def process( self, roi_id: str, @@ -71,142 +88,123 @@ class LeavePostAlgorithm: ) -> List[Dict]: current_time = current_time or datetime.now() - roi_has_person = False - for det in tracks: - if self.check_detection_in_roi(det, roi_id): - roi_has_person = True - break - - in_work = self.is_in_working_hours(current_time) + in_work = self._is_in_working_hours(current_time) alerts = [] if not in_work: self.state = self.STATE_NON_WORK_TIME - self.last_person_seen_time = None - self.last_detection_time = None - self.on_duty_window.clear() + self.detection_history.clear() self.alarm_sent = False - self.init_start_time = None - else: - if self.state == self.STATE_NON_WORK_TIME: - self.state = self.STATE_INIT - self.init_start_time = current_time - self.on_duty_window.clear() - self.alarm_sent = False + return [] - if self.state == self.STATE_INIT: - if roi_has_person: - self.state = self.STATE_ON_DUTY - self.state_start_time = current_time - self.on_duty_window.clear() - self.on_duty_window.append((current_time, True)) - self.last_person_seen_time = current_time - self.last_detection_time = current_time - self.init_start_time = None - else: - if self.init_start_time is None: - self.init_start_time = current_time + if self.state == self.STATE_NON_WORK_TIME: + self.state = self.STATE_WAITING + self.detection_history.clear() + self.alarm_sent = False - elapsed_since_init = (current_time - self.init_start_time).total_seconds() - if elapsed_since_init >= self.threshold_sec: - self.state = self.STATE_OFF_DUTY_COUNTDOWN - self.state_start_time = current_time - self.alarm_sent = False + roi_has_person = False + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): + roi_has_person = True + break - elif self.state == self.STATE_ON_DUTY: - if roi_has_person: - self.last_person_seen_time = current_time - self.last_detection_time = current_time + if self.state == self.STATE_WAITING: + if roi_has_person: + self.state = self.STATE_ON_DUTY + self.state_start_time = current_time + self.detection_history.clear() + self.detection_history.append((current_time, True)) + else: + pass - self.on_duty_window.append((current_time, True)) - while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec: - self.on_duty_window.popleft() - else: - self.on_duty_window.append((current_time, False)) - while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec: - self.on_duty_window.popleft() + elif self.state == self.STATE_ON_DUTY: + self.detection_history.append((current_time, roi_has_person)) + if not roi_has_person: + self.state = self.STATE_LEAVING + self.state_start_time = current_time - hit_ratio = sum(1 for t, detected in self.on_duty_window if detected) / max(len(self.on_duty_window), 1) + elif self.state == self.STATE_LEAVING: + self.detection_history.append((current_time, roi_has_person)) + elapsed = (current_time - self.state_start_time).total_seconds() - if hit_ratio == 0: - self.state = self.STATE_OFF_DUTY_COUNTDOWN - self.state_start_time = current_time - self.alarm_sent = False + if roi_has_person: + self.state = self.STATE_ON_DUTY + self.state_start_time = current_time + elif elapsed >= self.confirm_leave_sec: + self.state = self.STATE_OFF_DUTY + self.state_start_time = current_time - elif self.state == self.STATE_OFF_DUTY_COUNTDOWN: - elapsed = (current_time - self.state_start_time).total_seconds() + elif self.state == self.STATE_OFF_DUTY: + elapsed = (current_time - self.state_start_time).total_seconds() - if roi_has_person: - self.state = self.STATE_ON_DUTY - self.state_start_time = current_time - self.on_duty_window.clear() - self.on_duty_window.append((current_time, True)) - self.last_person_seen_time = current_time - self.alarm_sent = False - elif elapsed >= self.threshold_sec: - if not self.alarm_sent: - cooldown_key = f"{roi_id}" - if cooldown_key not in self.alert_cooldowns or ( - current_time - self.alert_cooldowns[cooldown_key] - ).total_seconds() > self.cooldown_seconds: - bbox = self.get_latest_bbox_in_roi(tracks, roi_id) - elapsed_minutes = int(elapsed / 60) - alerts.append({ - "track_id": roi_id, - "bbox": bbox, - "off_duty_duration": elapsed, - "alert_type": "leave_post", - "message": f"离岗超过 {elapsed_minutes} 分钟", - }) - self.alarm_sent = True - self.alert_cooldowns[cooldown_key] = current_time + if roi_has_person: + self.state = self.STATE_ON_DUTY + self.state_start_time = current_time + elif elapsed >= self.confirm_leave_sec: + cooldown_key = f"{camera_id}_{roi_id}" + now = datetime.now() + if cooldown_key not in self.alert_cooldowns or (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: + bbox = self._get_latest_bbox(tracks, roi_id) + elapsed_minutes = int(elapsed / 60) + alerts.append({ + "track_id": roi_id, + "camera_id": camera_id, + "bbox": bbox, + "off_duty_duration": elapsed, + "alert_type": "leave_post", + "message": f"离岗超过 {elapsed_minutes} 分钟", + }) + self.alert_cooldowns[cooldown_key] = now return alerts - def get_latest_bbox_in_roi(self, tracks: List[Dict], roi_id: str) -> List[float]: - for det in tracks: - if self.check_detection_in_roi(det, roi_id): - return det.get("bbox", []) - return [] - def reset(self): - self.state = self.STATE_INIT + self.state = self.STATE_WAITING self.state_start_time = None - self.on_duty_window.clear() + self.detection_history.clear() self.alarm_sent = False - self.last_person_seen_time = None - self.last_detection_time = None - self.init_start_time = None + self.last_person_time = None self.alert_cooldowns.clear() - def get_state(self, track_id: str) -> Optional[Dict[str, Any]]: + def get_state(self, roi_id: str) -> Dict[str, Any]: return { "state": self.state, "alarm_sent": self.alarm_sent, - "last_person_seen_time": self.last_person_seen_time, + "last_person_time": self.last_person_time, } class IntrusionAlgorithm: - def __init__(self, cooldown_seconds: int = 300): + def __init__( + self, + cooldown_seconds: int = 120, + confirm_seconds: int = 3, + target_class: Optional[str] = None, + ): self.cooldown_seconds = cooldown_seconds - self.last_alert_time: Dict[str, float] = {} - self.alert_triggered: Dict[str, bool] = {} + self.confirm_seconds = confirm_seconds + self.target_class = target_class - def is_roi_has_person(self, tracks: List[Dict], roi_id: str) -> bool: - for det in tracks: - matched_rois = det.get("matched_rois", []) - for roi in matched_rois: - if roi.get("roi_id") == roi_id: - return True + self.last_alert_time: Dict[str, datetime] = {} + self.alert_triggered: Dict[str, bool] = {} + self.detection_start: Dict[str, Optional[datetime]] = {} + + def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: + matched_rois = detection.get("matched_rois", []) + for roi in matched_rois: + if roi.get("roi_id") == roi_id: + return True return False - def get_latest_bbox_in_roi(self, tracks: List[Dict], roi_id: str) -> List[float]: + def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: + if not target_class: + return True + return detection.get("class") == target_class + + def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: for det in tracks: - matched_rois = det.get("matched_rois", []) - for roi in matched_rois: - if roi.get("roi_id") == roi_id: - return det.get("bbox", []) + if self._check_detection_in_roi(det, roi_id): + return det.get("bbox", []) return [] def process( @@ -216,30 +214,41 @@ class IntrusionAlgorithm: tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: - roi_has_person = self.is_roi_has_person(tracks, roi_id) - - if not roi_has_person: - return [] - - now = time.monotonic() + current_time = current_time or datetime.now() key = f"{camera_id}_{roi_id}" - if key not in self.last_alert_time: - self.last_alert_time[key] = 0 - self.alert_triggered[key] = False + roi_has_person = False + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class): + roi_has_person = True + break - if now - self.last_alert_time[key] >= self.cooldown_seconds: - self.last_alert_time[key] = now + if not roi_has_person: + self.detection_start.pop(key, None) self.alert_triggered[key] = False - - if self.alert_triggered[key]: return [] - bbox = self.get_latest_bbox_in_roi(tracks, roi_id) + if self.alert_triggered.get(key, False): + elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds() + if elapsed_since_alert < self.cooldown_seconds: + return [] + self.alert_triggered[key] = False + + if self.detection_start.get(key) is None: + self.detection_start[key] = current_time + + elapsed = (current_time - self.detection_start[key]).total_seconds() + if elapsed < self.confirm_seconds: + return [] + + bbox = self._get_latest_bbox(tracks, roi_id) + self.last_alert_time[key] = current_time self.alert_triggered[key] = True + self.detection_start[key] = None return [{ "roi_id": roi_id, + "camera_id": camera_id, "bbox": bbox, "alert_type": "intrusion", "message": "检测到周界入侵", @@ -248,6 +257,7 @@ class IntrusionAlgorithm: def reset(self): self.last_alert_time.clear() self.alert_triggered.clear() + self.detection_start.clear() class AlgorithmManager: @@ -257,12 +267,15 @@ class AlgorithmManager: self.default_params = { "leave_post": { - "threshold_sec": 300, - "confirm_sec": 10, - "return_sec": 30, + "confirm_on_duty_sec": 10, + "confirm_leave_sec": 10, + "cooldown_sec": 300, + "target_class": "person", }, "intrusion": { - "cooldown_seconds": 300, + "cooldown_seconds": 120, + "confirm_seconds": 3, + "target_class": None, }, } @@ -272,28 +285,30 @@ class AlgorithmManager: algorithm_type: str, params: Optional[Dict[str, Any]] = None, ): - if roi_id in self.algorithms: - if algorithm_type in self.algorithms[roi_id]: - return + if roi_id in self.algorithms and algorithm_type in self.algorithms[roi_id]: + return if roi_id not in self.algorithms: self.algorithms[roi_id] = {} - algo_params = self.default_params.get(algorithm_type, {}) + algo_params = self.default_params.get(algorithm_type, {}).copy() if params: algo_params.update(params) if algorithm_type == "leave_post": roi_working_hours = algo_params.get("working_hours") or self.working_hours self.algorithms[roi_id]["leave_post"] = LeavePostAlgorithm( - threshold_sec=algo_params.get("threshold_sec", 300), - confirm_sec=algo_params.get("confirm_sec", 10), - return_sec=algo_params.get("return_sec", 30), + confirm_on_duty_sec=algo_params.get("confirm_on_duty_sec", 10), + confirm_leave_sec=algo_params.get("confirm_leave_sec", 10), + cooldown_sec=algo_params.get("cooldown_sec", 300), working_hours=roi_working_hours, + target_class=algo_params.get("target_class", "person"), ) elif algorithm_type == "intrusion": self.algorithms[roi_id]["intrusion"] = IntrusionAlgorithm( - cooldown_seconds=algo_params.get("cooldown_seconds", 300), + cooldown_seconds=algo_params.get("cooldown_seconds", 120), + confirm_seconds=algo_params.get("confirm_seconds", 3), + target_class=algo_params.get("target_class"), ) def process( @@ -348,11 +363,11 @@ class AlgorithmManager: for algo_type, algo in self.algorithms[roi_id].items(): if algo_type == "leave_post": status[algo_type] = { - "state": getattr(algo, "state", "INIT_STATE"), + "state": getattr(algo, "state", "WAITING"), "alarm_sent": getattr(algo, "alarm_sent", False), } else: status[algo_type] = { - "track_count": len(getattr(algo, "track_states", {})), + "detection_count": len(getattr(algo, "detection_start", {})), } return status