Files
Security_AI_integrated/inference/rules/algorithms.py
16337 20f295a491
Some checks failed
Python Test / test (push) Has been cancelled
修复 ROI 区域内人员离开十几分钟未触发告警的问题。
1. 仍在 confirm_sec 滑动窗口内(未完成确认)
2. threshold_sec 阈值设置过长(需检查数据库实际配置值)
3. 新算法未被正确调用
2026-01-22 12:22:26 +08:00

349 lines
12 KiB
Python

import os
import sys
from collections import deque
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sort import Sort
class LeavePostAlgorithm:
STATE_ON_DUTY = "ON_DUTY"
STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN"
STATE_NON_WORK_TIME = "NON_WORK_TIME"
def __init__(
self,
threshold_sec: int = 300,
confirm_sec: int = 10,
return_sec: int = 30,
working_hours: Optional[List[Dict]] = None,
roi_polygon: Optional[List[Tuple[float, float]]] = None,
):
self.threshold_sec = threshold_sec
self.confirm_sec = confirm_sec
self.return_sec = return_sec
self.working_hours = working_hours or []
self.roi_polygon = roi_polygon
self.alert_cooldowns: Dict[str, datetime] = {}
self.cooldown_seconds = 300
self.state: str = self.STATE_ON_DUTY
self.state_start_time: Optional[datetime] = None
self.on_duty_window = deque()
self.alarm_sent: bool = False
self.last_person_seen_time: Optional[datetime] = None
def is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
if not self.working_hours:
return True
dt = dt or datetime.now()
current_minutes = dt.hour * 60 + dt.minute
for period in self.working_hours:
start_minutes = period["start"][0] * 60 + period["start"][1]
end_minutes = period["end"][0] * 60 + period["end"][1]
if start_minutes <= current_minutes < end_minutes:
return True
return False
def is_point_in_roi(self, x: float, y: float) -> bool:
if not self.roi_polygon or len(self.roi_polygon) < 3:
return True
from shapely.geometry import Point, Polygon
point = Point(x, y)
polygon = Polygon(self.roi_polygon)
return polygon.contains(point)
def check_roi_has_person(self, detections: List[Dict]) -> bool:
for det in detections:
bbox = det.get("bbox", [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if self.is_point_in_roi(center_x, center_y):
return True
return False
def process(
self,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
current_time = current_time or datetime.now()
roi_has_person = self.check_roi_has_person(tracks)
in_work = self.is_in_working_hours(current_time)
alerts = []
if not in_work:
self.state = self.STATE_NON_WORK_TIME
self.on_duty_start_time = None
self.last_person_seen_time = None
self.on_duty_window.clear()
self.alarm_sent = False
roi_has_person = False
else:
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_ON_DUTY
self.on_duty_window.clear()
self.alarm_sent = False
if self.state == self.STATE_ON_DUTY:
self.on_duty_window.append((current_time, roi_has_person))
while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec:
self.on_duty_window.popleft()
hit_ratio = sum(1 for t, detected in self.on_duty_window if detected) / max(len(self.on_duty_window), 1)
if not roi_has_person and hit_ratio == 0:
self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
self.alarm_sent = False
elif hit_ratio >= 0.75 and (current_time - self.on_duty_window[0][0]).total_seconds() >= self.confirm_sec:
self.last_person_seen_time = current_time
elif self.state == self.STATE_OFF_DUTY_COUNTDOWN:
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.on_duty_window.clear()
self.on_duty_window.append((current_time, True))
self.alarm_sent = False
elif elapsed >= self.threshold_sec:
if not self.alarm_sent:
cooldown_key = f"{camera_id}"
if cooldown_key not in self.alert_cooldowns or (
current_time - self.alert_cooldowns[cooldown_key]
).total_seconds() > self.cooldown_seconds:
bbox = self.get_latest_bbox(tracks)
elapsed_minutes = int(elapsed / 60)
alerts.append({
"track_id": camera_id,
"bbox": bbox,
"off_duty_duration": elapsed,
"alert_type": "leave_post",
"message": f"离岗超过 {elapsed_minutes} 分钟",
})
self.alarm_sent = True
self.alert_cooldowns[cooldown_key] = current_time
return alerts
def get_latest_bbox(self, tracks: List[Dict]) -> List[float]:
for det in tracks:
bbox = det.get("bbox", [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if self.is_point_in_roi(center_x, center_y):
return bbox
if tracks:
return tracks[0].get("bbox", [])
return []
def reset(self):
self.state = self.STATE_ON_DUTY
self.state_start_time = None
self.on_duty_window.clear()
self.alarm_sent = False
self.last_person_seen_time = None
self.alert_cooldowns.clear()
def get_state(self, track_id: str) -> Optional[Dict[str, Any]]:
return {
"state": self.state,
"alarm_sent": self.alarm_sent,
"last_person_seen_time": self.last_person_seen_time,
}
class IntrusionAlgorithm:
def __init__(
self,
check_interval_sec: float = 1.0,
direction_sensitive: bool = False,
):
self.check_interval_sec = check_interval_sec
self.direction_sensitive = direction_sensitive
self.last_check_times: Dict[str, float] = {}
self.tracker = Sort(max_age=5, min_hits=1, iou_threshold=0.3)
self.alert_cooldowns: Dict[str, datetime] = {}
self.cooldown_seconds = 300
def process(
self,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
if not tracks:
return []
detections = []
for track in tracks:
bbox = track.get("bbox", [])
if len(bbox) >= 4:
detections.append(bbox + [track.get("conf", 0.0)])
if not detections:
return []
current_ts = current_time.timestamp() if current_time else datetime.now().timestamp()
if camera_id in self.last_check_times:
if current_ts - self.last_check_times[camera_id] < self.check_interval_sec:
return []
self.last_check_times[camera_id] = current_ts
detections = np.array(detections)
tracked = self.tracker.update(detections)
alerts = []
now = datetime.now()
for track_data in tracked:
x1, y1, x2, y2, track_id = track_data
cooldown_key = f"{camera_id}_{int(track_id)}"
if cooldown_key not in self.alert_cooldowns or (
now - self.alert_cooldowns[cooldown_key]
).total_seconds() > self.cooldown_seconds:
alerts.append({
"track_id": str(int(track_id)),
"bbox": [x1, y1, x2, y2],
"alert_type": "intrusion",
"confidence": track_data[4] if len(track_data) > 4 else 0.0,
"message": "检测到周界入侵",
})
self.alert_cooldowns[cooldown_key] = now
return alerts
def reset(self):
self.last_check_times.clear()
self.alert_cooldowns.clear()
class AlgorithmManager:
def __init__(self, working_hours: Optional[List[Dict]] = None):
self.algorithms: Dict[str, Dict[str, Any]] = {}
self.working_hours = working_hours or []
self.default_params = {
"leave_post": {
"threshold_sec": 300,
"confirm_sec": 10,
"return_sec": 30,
},
"intrusion": {
"check_interval_sec": 1.0,
"direction_sensitive": False,
},
}
def register_algorithm(
self,
roi_id: str,
algorithm_type: str,
params: Optional[Dict[str, Any]] = None,
):
if roi_id in self.algorithms:
if algorithm_type in self.algorithms[roi_id]:
return
if roi_id not in self.algorithms:
self.algorithms[roi_id] = {}
algo_params = self.default_params.get(algorithm_type, {})
if params:
algo_params.update(params)
if algorithm_type == "leave_post":
self.algorithms[roi_id]["leave_post"] = LeavePostAlgorithm(
threshold_sec=algo_params.get("threshold_sec", 300),
confirm_sec=algo_params.get("confirm_sec", 10),
return_sec=algo_params.get("return_sec", 30),
working_hours=self.working_hours,
)
elif algorithm_type == "intrusion":
self.algorithms[roi_id]["intrusion"] = IntrusionAlgorithm(
check_interval_sec=algo_params.get("check_interval_sec", 1.0),
direction_sensitive=algo_params.get("direction_sensitive", False),
)
def process(
self,
roi_id: str,
camera_id: str,
algorithm_type: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
algo = self.algorithms.get(roi_id, {}).get(algorithm_type)
if algo is None:
return []
return algo.process(camera_id, tracks, current_time)
def update_roi_params(
self,
roi_id: str,
algorithm_type: str,
params: Dict[str, Any],
):
if roi_id in self.algorithms and algorithm_type in self.algorithms[roi_id]:
algo = self.algorithms[roi_id][algorithm_type]
for key, value in params.items():
if hasattr(algo, key):
setattr(algo, key, value)
def reset_algorithm(self, roi_id: str, algorithm_type: Optional[str] = None):
if roi_id not in self.algorithms:
return
if algorithm_type:
if algorithm_type in self.algorithms[roi_id]:
self.algorithms[roi_id][algorithm_type].reset()
else:
for algo in self.algorithms[roi_id].values():
algo.reset()
def reset_all(self):
for roi_algorithms in self.algorithms.values():
for algo in roi_algorithms.values():
algo.reset()
def remove_roi(self, roi_id: str):
if roi_id in self.algorithms:
self.reset_algorithm(roi_id)
del self.algorithms[roi_id]
def get_status(self, roi_id: str) -> Dict[str, Any]:
status = {}
if roi_id in self.algorithms:
for algo_type, algo in self.algorithms[roi_id].items():
if algo_type == "leave_post":
status[algo_type] = {
"state": getattr(algo, "state", "INIT_STATE"),
"alarm_sent": getattr(algo, "alarm_sent", False),
}
else:
status[algo_type] = {
"track_count": len(getattr(algo, "track_states", {})),
}
return status