refactor:使用状态机优化离岗检测逻辑,并移除排序相关算法

This commit is contained in:
2026-01-22 11:03:01 +08:00
parent 1a94854c52
commit 10b9fb1804

View File

@@ -1,5 +1,6 @@
import os import os
import sys import sys
from collections import deque
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
@@ -8,28 +9,41 @@ import numpy as np
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sort import Sort
class LeavePostAlgorithm: class LeavePostAlgorithm:
STATE_INIT = "INIT_STATE"
STATE_ON_DUTY_CONFIRMING = "ON_DUTY_CONFIRMING"
STATE_ON_DUTY = "ON_DUTY"
STATE_OFF_DUTY_CONFIRMING = "OFF_DUTY_CONFIRMING"
STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN"
STATE_NON_WORK_TIME = "NON_WORK_TIME"
def __init__( def __init__(
self, self,
threshold_sec: int = 360, threshold_sec: int = 360,
confirm_sec: int = 30, confirm_sec: int = 30,
return_sec: int = 5, return_sec: int = 5,
working_hours: Optional[List[Dict]] = None, working_hours: Optional[List[Dict]] = None,
roi_polygon: Optional[List[Tuple[float, float]]] = None,
): ):
self.threshold_sec = threshold_sec self.threshold_sec = threshold_sec
self.confirm_sec = confirm_sec self.confirm_sec = confirm_sec
self.return_sec = return_sec self.return_sec = return_sec
self.working_hours = working_hours or [] self.working_hours = working_hours or []
self.roi_polygon = roi_polygon
self.track_states: Dict[str, Dict[str, Any]] = {}
self.tracker = Sort(max_age=10, min_hits=2, iou_threshold=0.3)
self.alert_cooldowns: Dict[str, datetime] = {} self.alert_cooldowns: Dict[str, datetime] = {}
self.cooldown_seconds = 300 self.cooldown_seconds = 300
self.state: str = self.STATE_INIT
self.state_start_time: Optional[datetime] = None
self.initial_state_start_time: Optional[datetime] = None
self.has_ever_seen_person: bool = False
self.on_duty_window = deque()
self.alarm_sent: bool = False
self.last_person_seen_time: Optional[datetime] = None
self.on_duty_start_time: Optional[datetime] = None
def is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: def is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
if not self.working_hours: if not self.working_hours:
return True return True
@@ -45,90 +59,160 @@ class LeavePostAlgorithm:
return False return False
def is_point_in_roi(self, x: float, y: float) -> bool:
if not self.roi_polygon or len(self.roi_polygon) < 3:
return False
from shapely.geometry import Point, Polygon
point = Point(x, y)
polygon = Polygon(self.roi_polygon)
return polygon.contains(point)
def check_roi_has_person(self, detections: List[Dict]) -> bool:
for det in detections:
bbox = det.get("bbox", [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if self.is_point_in_roi(center_x, center_y):
return True
return False
def process( def process(
self, self,
camera_id: str, camera_id: str,
tracks: List[Dict], tracks: List[Dict],
current_time: Optional[datetime] = None, current_time: Optional[datetime] = None,
) -> List[Dict]: ) -> List[Dict]:
if not self.is_in_working_hours(current_time):
return []
if not tracks:
return []
detections = []
for track in tracks:
bbox = track.get("bbox", [])
if len(bbox) >= 4:
detections.append(bbox + [track.get("conf", 0.0)])
if not detections:
return []
detections = np.array(detections)
tracked = self.tracker.update(detections)
alerts = []
current_time = current_time or datetime.now() current_time = current_time or datetime.now()
for track_data in tracked: roi_has_person = self.check_roi_has_person(tracks)
x1, y1, x2, y2, track_id = track_data in_work = self.is_in_working_hours(current_time)
track_id = str(int(track_id)) alerts = []
if track_id not in self.track_states: if not in_work:
self.track_states[track_id] = { self.state = self.STATE_NON_WORK_TIME
"first_seen": current_time, self.on_duty_start_time = None
"last_seen": current_time, self.last_person_seen_time = None
"off_duty_start": None, self.initial_state_start_time = None
"alerted": False, self.has_ever_seen_person = False
"last_position": (x1, y1, x2, y2), self.on_duty_window.clear()
} self.alarm_sent = False
roi_has_person = False
else:
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_INIT
self.initial_state_start_time = current_time
self.has_ever_seen_person = False
self.on_duty_window.clear()
self.alarm_sent = False
state = self.track_states[track_id] if self.state == self.STATE_INIT:
state["last_seen"] = current_time self.initial_state_start_time = current_time
state["last_position"] = (x1, y1, x2, y2) self.has_ever_seen_person = False
self.alarm_sent = False
if state["off_duty_start"] is None: if roi_has_person:
off_duty_duration = (current_time - state["first_seen"]).total_seconds() self.state = self.STATE_ON_DUTY_CONFIRMING
if off_duty_duration > self.confirm_sec: self.state_start_time = current_time
state["off_duty_start"] = current_time self.on_duty_window.clear()
else: self.on_duty_window.append((current_time, True))
elapsed = (current_time - state["off_duty_start"]).total_seconds() self.has_ever_seen_person = True
if elapsed > self.threshold_sec:
if not state["alerted"]:
cooldown_key = f"{camera_id}_{track_id}"
now = datetime.now()
if cooldown_key not in self.alert_cooldowns or (
now - self.alert_cooldowns[cooldown_key]
).total_seconds() > self.cooldown_seconds:
alerts.append({
"track_id": track_id,
"bbox": [x1, y1, x2, y2],
"off_duty_duration": elapsed,
"alert_type": "leave_post",
"message": f"离岗超过 {int(elapsed / 60)} 分钟",
})
state["alerted"] = True
self.alert_cooldowns[cooldown_key] = now
else: else:
if elapsed < self.return_sec: elapsed = (current_time - self.initial_state_start_time).total_seconds()
state["off_duty_start"] = None if elapsed >= self.threshold_sec:
state["alerted"] = False self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
self.alarm_sent = False
cleanup_time = current_time - timedelta(minutes=5) elif self.state == self.STATE_ON_DUTY_CONFIRMING:
for track_id, state in list(self.track_states.items()): self.on_duty_window.append((current_time, roi_has_person))
if state["last_seen"] < cleanup_time: while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec:
del self.track_states[track_id] self.on_duty_window.popleft()
hit_ratio = sum(1 for t, detected in self.on_duty_window if detected) / max(len(self.on_duty_window), 1)
if hit_ratio >= 0.75 and (current_time - self.state_start_time).total_seconds() >= self.confirm_sec:
self.state = self.STATE_ON_DUTY
self.on_duty_start_time = None
self.last_person_seen_time = current_time
elif not roi_has_person and (current_time - self.state_start_time).total_seconds() > self.confirm_sec:
self.state = self.STATE_INIT
self.initial_state_start_time = current_time
elif self.state == self.STATE_ON_DUTY:
if roi_has_person:
self.last_person_seen_time = current_time
else:
self.state = self.STATE_OFF_DUTY_CONFIRMING
self.state_start_time = current_time
elif self.state == self.STATE_OFF_DUTY_CONFIRMING:
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
self.last_person_seen_time = current_time
elif (current_time - self.state_start_time).total_seconds() >= self.return_sec:
self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
self.alarm_sent = False
elif self.state == self.STATE_OFF_DUTY_COUNTDOWN:
if roi_has_person:
self.state = self.STATE_ON_DUTY_CONFIRMING
self.state_start_time = current_time
self.on_duty_window.clear()
self.on_duty_window.append((current_time, True))
elif (current_time - self.state_start_time).total_seconds() >= self.threshold_sec:
if not self.alarm_sent:
cooldown_key = f"{camera_id}"
if cooldown_key not in self.alert_cooldowns or (
current_time - self.alert_cooldowns[cooldown_key]
).total_seconds() > self.cooldown_seconds:
bbox = self.get_latest_bbox(tracks)
elapsed_minutes = int((current_time - self.state_start_time).total_seconds() / 60)
alerts.append({
"track_id": camera_id,
"bbox": bbox,
"off_duty_duration": (current_time - self.state_start_time).total_seconds(),
"alert_type": "leave_post",
"message": f"离岗超过 {elapsed_minutes} 分钟",
})
self.alarm_sent = True
self.alert_cooldowns[cooldown_key] = current_time
return alerts return alerts
def get_latest_bbox(self, tracks: List[Dict]) -> List[float]:
for det in tracks:
bbox = det.get("bbox", [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if self.is_point_in_roi(center_x, center_y):
return bbox
if tracks:
return tracks[0].get("bbox", [])
return []
def reset(self): def reset(self):
self.track_states.clear() self.state = self.STATE_INIT
self.state_start_time = None
self.initial_state_start_time = None
self.has_ever_seen_person = False
self.on_duty_window.clear()
self.alarm_sent = False
self.last_person_seen_time = None
self.on_duty_start_time = None
self.alert_cooldowns.clear() self.alert_cooldowns.clear()
def get_state(self, track_id: str) -> Optional[Dict[str, Any]]: def get_state(self, track_id: str) -> Optional[Dict[str, Any]]:
return self.track_states.get(track_id) return {
"state": self.state,
"alarm_sent": self.alarm_sent,
"has_ever_seen_person": self.has_ever_seen_person,
}
class IntrusionAlgorithm: class IntrusionAlgorithm:
@@ -297,7 +381,13 @@ class AlgorithmManager:
status = {} status = {}
if roi_id in self.algorithms: if roi_id in self.algorithms:
for algo_type, algo in self.algorithms[roi_id].items(): for algo_type, algo in self.algorithms[roi_id].items():
status[algo_type] = { if algo_type == "leave_post":
"track_count": len(getattr(algo, "track_states", {})), status[algo_type] = {
} "state": getattr(algo, "state", "INIT_STATE"),
"alarm_sent": getattr(algo, "alarm_sent", False),
}
else:
status[algo_type] = {
"track_count": len(getattr(algo, "track_states", {})),
}
return status return status