From 10b9fb18042de2d013d016e7aedb34db6c230054 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 22 Jan 2026 11:03:01 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=E4=BD=BF=E7=94=A8=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=9C=BA=E4=BC=98=E5=8C=96=E7=A6=BB=E5=B2=97=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E5=B9=B6=E7=A7=BB=E9=99=A4=E6=8E=92?= =?UTF-8?q?=E5=BA=8F=E7=9B=B8=E5=85=B3=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inference/rules/algorithms.py | 234 +++++++++++++++++++++++----------- 1 file changed, 162 insertions(+), 72 deletions(-) diff --git a/inference/rules/algorithms.py b/inference/rules/algorithms.py index cee4667..d2fc4b9 100644 --- a/inference/rules/algorithms.py +++ b/inference/rules/algorithms.py @@ -1,5 +1,6 @@ import os import sys +from collections import deque from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple @@ -8,28 +9,41 @@ import numpy as np sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from sort import Sort - class LeavePostAlgorithm: + STATE_INIT = "INIT_STATE" + STATE_ON_DUTY_CONFIRMING = "ON_DUTY_CONFIRMING" + STATE_ON_DUTY = "ON_DUTY" + STATE_OFF_DUTY_CONFIRMING = "OFF_DUTY_CONFIRMING" + STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN" + STATE_NON_WORK_TIME = "NON_WORK_TIME" + def __init__( self, threshold_sec: int = 360, confirm_sec: int = 30, return_sec: int = 5, working_hours: Optional[List[Dict]] = None, + roi_polygon: Optional[List[Tuple[float, float]]] = None, ): self.threshold_sec = threshold_sec self.confirm_sec = confirm_sec self.return_sec = return_sec self.working_hours = working_hours or [] - - self.track_states: Dict[str, Dict[str, Any]] = {} - self.tracker = Sort(max_age=10, min_hits=2, iou_threshold=0.3) + self.roi_polygon = roi_polygon self.alert_cooldowns: Dict[str, datetime] = {} self.cooldown_seconds = 300 + self.state: str = self.STATE_INIT + self.state_start_time: Optional[datetime] = None + self.initial_state_start_time: Optional[datetime] = None + self.has_ever_seen_person: bool = False + self.on_duty_window = deque() + self.alarm_sent: bool = False + self.last_person_seen_time: Optional[datetime] = None + self.on_duty_start_time: Optional[datetime] = None + def is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: if not self.working_hours: return True @@ -45,90 +59,160 @@ class LeavePostAlgorithm: return False + def is_point_in_roi(self, x: float, y: float) -> bool: + if not self.roi_polygon or len(self.roi_polygon) < 3: + return False + from shapely.geometry import Point, Polygon + point = Point(x, y) + polygon = Polygon(self.roi_polygon) + return polygon.contains(point) + + def check_roi_has_person(self, detections: List[Dict]) -> bool: + for det in detections: + bbox = det.get("bbox", []) + if len(bbox) >= 4: + x1, y1, x2, y2 = bbox[:4] + center_x = (x1 + x2) / 2 + center_y = (y1 + y2) / 2 + if self.is_point_in_roi(center_x, center_y): + return True + return False + def process( self, camera_id: str, tracks: List[Dict], current_time: Optional[datetime] = None, ) -> List[Dict]: - if not self.is_in_working_hours(current_time): - return [] - - if not tracks: - return [] - - detections = [] - for track in tracks: - bbox = track.get("bbox", []) - if len(bbox) >= 4: - detections.append(bbox + [track.get("conf", 0.0)]) - - if not detections: - return [] - - detections = np.array(detections) - tracked = self.tracker.update(detections) - - alerts = [] current_time = current_time or datetime.now() - for track_data in tracked: - x1, y1, x2, y2, track_id = track_data - track_id = str(int(track_id)) + roi_has_person = self.check_roi_has_person(tracks) + in_work = self.is_in_working_hours(current_time) + alerts = [] - if track_id not in self.track_states: - self.track_states[track_id] = { - "first_seen": current_time, - "last_seen": current_time, - "off_duty_start": None, - "alerted": False, - "last_position": (x1, y1, x2, y2), - } + if not in_work: + self.state = self.STATE_NON_WORK_TIME + self.on_duty_start_time = None + self.last_person_seen_time = None + self.initial_state_start_time = None + self.has_ever_seen_person = False + self.on_duty_window.clear() + self.alarm_sent = False + roi_has_person = False + else: + if self.state == self.STATE_NON_WORK_TIME: + self.state = self.STATE_INIT + self.initial_state_start_time = current_time + self.has_ever_seen_person = False + self.on_duty_window.clear() + self.alarm_sent = False - state = self.track_states[track_id] - state["last_seen"] = current_time - state["last_position"] = (x1, y1, x2, y2) + if self.state == self.STATE_INIT: + self.initial_state_start_time = current_time + self.has_ever_seen_person = False + self.alarm_sent = False - if state["off_duty_start"] is None: - off_duty_duration = (current_time - state["first_seen"]).total_seconds() - if off_duty_duration > self.confirm_sec: - state["off_duty_start"] = current_time - else: - elapsed = (current_time - state["off_duty_start"]).total_seconds() - if elapsed > self.threshold_sec: - if not state["alerted"]: - cooldown_key = f"{camera_id}_{track_id}" - now = datetime.now() - if cooldown_key not in self.alert_cooldowns or ( - now - self.alert_cooldowns[cooldown_key] - ).total_seconds() > self.cooldown_seconds: - alerts.append({ - "track_id": track_id, - "bbox": [x1, y1, x2, y2], - "off_duty_duration": elapsed, - "alert_type": "leave_post", - "message": f"离岗超过 {int(elapsed / 60)} 分钟", - }) - state["alerted"] = True - self.alert_cooldowns[cooldown_key] = now + if roi_has_person: + self.state = self.STATE_ON_DUTY_CONFIRMING + self.state_start_time = current_time + self.on_duty_window.clear() + self.on_duty_window.append((current_time, True)) + self.has_ever_seen_person = True else: - if elapsed < self.return_sec: - state["off_duty_start"] = None - state["alerted"] = False + elapsed = (current_time - self.initial_state_start_time).total_seconds() + if elapsed >= self.threshold_sec: + self.state = self.STATE_OFF_DUTY_COUNTDOWN + self.state_start_time = current_time + self.alarm_sent = False - cleanup_time = current_time - timedelta(minutes=5) - for track_id, state in list(self.track_states.items()): - if state["last_seen"] < cleanup_time: - del self.track_states[track_id] + elif self.state == self.STATE_ON_DUTY_CONFIRMING: + self.on_duty_window.append((current_time, roi_has_person)) + while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec: + self.on_duty_window.popleft() + + hit_ratio = sum(1 for t, detected in self.on_duty_window if detected) / max(len(self.on_duty_window), 1) + + if hit_ratio >= 0.75 and (current_time - self.state_start_time).total_seconds() >= self.confirm_sec: + self.state = self.STATE_ON_DUTY + self.on_duty_start_time = None + self.last_person_seen_time = current_time + elif not roi_has_person and (current_time - self.state_start_time).total_seconds() > self.confirm_sec: + self.state = self.STATE_INIT + self.initial_state_start_time = current_time + + elif self.state == self.STATE_ON_DUTY: + if roi_has_person: + self.last_person_seen_time = current_time + else: + self.state = self.STATE_OFF_DUTY_CONFIRMING + self.state_start_time = current_time + + elif self.state == self.STATE_OFF_DUTY_CONFIRMING: + if roi_has_person: + self.state = self.STATE_ON_DUTY + self.state_start_time = current_time + self.last_person_seen_time = current_time + elif (current_time - self.state_start_time).total_seconds() >= self.return_sec: + self.state = self.STATE_OFF_DUTY_COUNTDOWN + self.state_start_time = current_time + self.alarm_sent = False + + elif self.state == self.STATE_OFF_DUTY_COUNTDOWN: + if roi_has_person: + self.state = self.STATE_ON_DUTY_CONFIRMING + self.state_start_time = current_time + self.on_duty_window.clear() + self.on_duty_window.append((current_time, True)) + elif (current_time - self.state_start_time).total_seconds() >= self.threshold_sec: + if not self.alarm_sent: + cooldown_key = f"{camera_id}" + if cooldown_key not in self.alert_cooldowns or ( + current_time - self.alert_cooldowns[cooldown_key] + ).total_seconds() > self.cooldown_seconds: + bbox = self.get_latest_bbox(tracks) + elapsed_minutes = int((current_time - self.state_start_time).total_seconds() / 60) + alerts.append({ + "track_id": camera_id, + "bbox": bbox, + "off_duty_duration": (current_time - self.state_start_time).total_seconds(), + "alert_type": "leave_post", + "message": f"离岗超过 {elapsed_minutes} 分钟", + }) + self.alarm_sent = True + self.alert_cooldowns[cooldown_key] = current_time return alerts + def get_latest_bbox(self, tracks: List[Dict]) -> List[float]: + for det in tracks: + bbox = det.get("bbox", []) + if len(bbox) >= 4: + x1, y1, x2, y2 = bbox[:4] + center_x = (x1 + x2) / 2 + center_y = (y1 + y2) / 2 + if self.is_point_in_roi(center_x, center_y): + return bbox + if tracks: + return tracks[0].get("bbox", []) + return [] + def reset(self): - self.track_states.clear() + self.state = self.STATE_INIT + self.state_start_time = None + self.initial_state_start_time = None + self.has_ever_seen_person = False + self.on_duty_window.clear() + self.alarm_sent = False + self.last_person_seen_time = None + self.on_duty_start_time = None self.alert_cooldowns.clear() def get_state(self, track_id: str) -> Optional[Dict[str, Any]]: - return self.track_states.get(track_id) + return { + "state": self.state, + "alarm_sent": self.alarm_sent, + "has_ever_seen_person": self.has_ever_seen_person, + } class IntrusionAlgorithm: @@ -297,7 +381,13 @@ class AlgorithmManager: status = {} if roi_id in self.algorithms: for algo_type, algo in self.algorithms[roi_id].items(): - status[algo_type] = { - "track_count": len(getattr(algo, "track_states", {})), - } + if algo_type == "leave_post": + status[algo_type] = { + "state": getattr(algo, "state", "INIT_STATE"), + "alarm_sent": getattr(algo, "alarm_sent", False), + } + else: + status[algo_type] = { + "track_count": len(getattr(algo, "track_states", {})), + } return status