Files
Security_AI_integrated/inference/rules/algorithms.py
2026-01-22 11:43:40 +08:00

396 lines
15 KiB
Python

import os
import sys
from collections import deque
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sort import Sort
class LeavePostAlgorithm:
STATE_INIT = "INIT_STATE"
STATE_ON_DUTY_CONFIRMING = "ON_DUTY_CONFIRMING"
STATE_ON_DUTY = "ON_DUTY"
STATE_OFF_DUTY_CONFIRMING = "OFF_DUTY_CONFIRMING"
STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN"
STATE_NON_WORK_TIME = "NON_WORK_TIME"
def __init__(
self,
threshold_sec: int = 360,
confirm_sec: int = 30,
return_sec: int = 5,
working_hours: Optional[List[Dict]] = None,
roi_polygon: Optional[List[Tuple[float, float]]] = None,
):
self.threshold_sec = threshold_sec
self.confirm_sec = confirm_sec
self.return_sec = return_sec
self.working_hours = working_hours or []
self.roi_polygon = roi_polygon
self.alert_cooldowns: Dict[str, datetime] = {}
self.cooldown_seconds = 300
self.state: str = self.STATE_INIT
self.state_start_time: Optional[datetime] = None
self.initial_state_start_time: Optional[datetime] = None
self.has_ever_seen_person: bool = False
self.on_duty_window = deque()
self.alarm_sent: bool = False
self.last_person_seen_time: Optional[datetime] = None
self.on_duty_start_time: Optional[datetime] = None
def is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
if not self.working_hours:
return True
dt = dt or datetime.now()
current_minutes = dt.hour * 60 + dt.minute
for period in self.working_hours:
start_minutes = period["start"][0] * 60 + period["start"][1]
end_minutes = period["end"][0] * 60 + period["end"][1]
if start_minutes <= current_minutes < end_minutes:
return True
return False
def is_point_in_roi(self, x: float, y: float) -> bool:
if not self.roi_polygon or len(self.roi_polygon) < 3:
return False
from shapely.geometry import Point, Polygon
point = Point(x, y)
polygon = Polygon(self.roi_polygon)
return polygon.contains(point)
def check_roi_has_person(self, detections: List[Dict]) -> bool:
for det in detections:
bbox = det.get("bbox", [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if self.is_point_in_roi(center_x, center_y):
return True
return False
def process(
self,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
current_time = current_time or datetime.now()
roi_has_person = self.check_roi_has_person(tracks)
in_work = self.is_in_working_hours(current_time)
alerts = []
if not in_work:
self.state = self.STATE_NON_WORK_TIME
self.on_duty_start_time = None
self.last_person_seen_time = None
self.initial_state_start_time = None
self.has_ever_seen_person = False
self.on_duty_window.clear()
self.alarm_sent = False
roi_has_person = False
else:
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_INIT
self.initial_state_start_time = current_time
self.has_ever_seen_person = False
self.on_duty_window.clear()
self.alarm_sent = False
if self.state == self.STATE_INIT:
self.initial_state_start_time = current_time
self.has_ever_seen_person = False
self.alarm_sent = False
if roi_has_person:
self.state = self.STATE_ON_DUTY_CONFIRMING
self.state_start_time = current_time
self.on_duty_window.clear()
self.on_duty_window.append((current_time, True))
self.has_ever_seen_person = True
else:
elapsed = (current_time - self.initial_state_start_time).total_seconds()
if elapsed >= self.threshold_sec:
self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
self.alarm_sent = False
elif self.state == self.STATE_ON_DUTY_CONFIRMING:
self.on_duty_window.append((current_time, roi_has_person))
while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec:
self.on_duty_window.popleft()
hit_ratio = sum(1 for t, detected in self.on_duty_window if detected) / max(len(self.on_duty_window), 1)
if hit_ratio >= 0.75 and (current_time - self.state_start_time).total_seconds() >= self.confirm_sec:
self.state = self.STATE_ON_DUTY
self.on_duty_start_time = None
self.last_person_seen_time = current_time
elif not roi_has_person and (current_time - self.state_start_time).total_seconds() > self.confirm_sec:
self.state = self.STATE_INIT
self.initial_state_start_time = current_time
elif self.state == self.STATE_ON_DUTY:
if roi_has_person:
self.last_person_seen_time = current_time
else:
self.state = self.STATE_OFF_DUTY_CONFIRMING
self.state_start_time = current_time
elif self.state == self.STATE_OFF_DUTY_CONFIRMING:
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
self.last_person_seen_time = current_time
elif (current_time - self.state_start_time).total_seconds() >= self.return_sec:
self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
self.alarm_sent = False
elif self.state == self.STATE_OFF_DUTY_COUNTDOWN:
if roi_has_person:
self.state = self.STATE_ON_DUTY_CONFIRMING
self.state_start_time = current_time
self.on_duty_window.clear()
self.on_duty_window.append((current_time, True))
elif (current_time - self.state_start_time).total_seconds() >= self.threshold_sec:
if not self.alarm_sent:
cooldown_key = f"{camera_id}"
if cooldown_key not in self.alert_cooldowns or (
current_time - self.alert_cooldowns[cooldown_key]
).total_seconds() > self.cooldown_seconds:
bbox = self.get_latest_bbox(tracks)
elapsed_minutes = int((current_time - self.state_start_time).total_seconds() / 60)
alerts.append({
"track_id": camera_id,
"bbox": bbox,
"off_duty_duration": (current_time - self.state_start_time).total_seconds(),
"alert_type": "leave_post",
"message": f"离岗超过 {elapsed_minutes} 分钟",
})
self.alarm_sent = True
self.alert_cooldowns[cooldown_key] = current_time
return alerts
def get_latest_bbox(self, tracks: List[Dict]) -> List[float]:
for det in tracks:
bbox = det.get("bbox", [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if self.is_point_in_roi(center_x, center_y):
return bbox
if tracks:
return tracks[0].get("bbox", [])
return []
def reset(self):
self.state = self.STATE_INIT
self.state_start_time = None
self.initial_state_start_time = None
self.has_ever_seen_person = False
self.on_duty_window.clear()
self.alarm_sent = False
self.last_person_seen_time = None
self.on_duty_start_time = None
self.alert_cooldowns.clear()
def get_state(self, track_id: str) -> Optional[Dict[str, Any]]:
return {
"state": self.state,
"alarm_sent": self.alarm_sent,
"has_ever_seen_person": self.has_ever_seen_person,
}
class IntrusionAlgorithm:
def __init__(
self,
check_interval_sec: float = 1.0,
direction_sensitive: bool = False,
):
self.check_interval_sec = check_interval_sec
self.direction_sensitive = direction_sensitive
self.last_check_times: Dict[str, float] = {}
self.tracker = Sort(max_age=5, min_hits=1, iou_threshold=0.3)
self.alert_cooldowns: Dict[str, datetime] = {}
self.cooldown_seconds = 300
def process(
self,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
if not tracks:
return []
detections = []
for track in tracks:
bbox = track.get("bbox", [])
if len(bbox) >= 4:
detections.append(bbox + [track.get("conf", 0.0)])
if not detections:
return []
current_ts = current_time.timestamp() if current_time else datetime.now().timestamp()
if camera_id in self.last_check_times:
if current_ts - self.last_check_times[camera_id] < self.check_interval_sec:
return []
self.last_check_times[camera_id] = current_ts
detections = np.array(detections)
tracked = self.tracker.update(detections)
alerts = []
now = datetime.now()
for track_data in tracked:
x1, y1, x2, y2, track_id = track_data
cooldown_key = f"{camera_id}_{int(track_id)}"
if cooldown_key not in self.alert_cooldowns or (
now - self.alert_cooldowns[cooldown_key]
).total_seconds() > self.cooldown_seconds:
alerts.append({
"track_id": str(int(track_id)),
"bbox": [x1, y1, x2, y2],
"alert_type": "intrusion",
"confidence": track_data[4] if len(track_data) > 4 else 0.0,
"message": "检测到周界入侵",
})
self.alert_cooldowns[cooldown_key] = now
return alerts
def reset(self):
self.last_check_times.clear()
self.alert_cooldowns.clear()
class AlgorithmManager:
def __init__(self, working_hours: Optional[List[Dict]] = None):
self.algorithms: Dict[str, Dict[str, Any]] = {}
self.working_hours = working_hours or []
self.default_params = {
"leave_post": {
"threshold_sec": 360,
"confirm_sec": 30,
"return_sec": 5,
},
"intrusion": {
"check_interval_sec": 1.0,
"direction_sensitive": False,
},
}
def register_algorithm(
self,
roi_id: str,
algorithm_type: str,
params: Optional[Dict[str, Any]] = None,
):
if roi_id in self.algorithms:
if algorithm_type in self.algorithms[roi_id]:
return
if roi_id not in self.algorithms:
self.algorithms[roi_id] = {}
algo_params = self.default_params.get(algorithm_type, {})
if params:
algo_params.update(params)
if algorithm_type == "leave_post":
self.algorithms[roi_id]["leave_post"] = LeavePostAlgorithm(
threshold_sec=algo_params.get("threshold_sec", 360),
confirm_sec=algo_params.get("confirm_sec", 30),
return_sec=algo_params.get("return_sec", 5),
working_hours=self.working_hours,
)
elif algorithm_type == "intrusion":
self.algorithms[roi_id]["intrusion"] = IntrusionAlgorithm(
check_interval_sec=algo_params.get("check_interval_sec", 1.0),
direction_sensitive=algo_params.get("direction_sensitive", False),
)
def process(
self,
roi_id: str,
camera_id: str,
algorithm_type: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
algo = self.algorithms.get(roi_id, {}).get(algorithm_type)
if algo is None:
return []
return algo.process(camera_id, tracks, current_time)
def update_roi_params(
self,
roi_id: str,
algorithm_type: str,
params: Dict[str, Any],
):
if roi_id in self.algorithms and algorithm_type in self.algorithms[roi_id]:
algo = self.algorithms[roi_id][algorithm_type]
for key, value in params.items():
if hasattr(algo, key):
setattr(algo, key, value)
def reset_algorithm(self, roi_id: str, algorithm_type: Optional[str] = None):
if roi_id not in self.algorithms:
return
if algorithm_type:
if algorithm_type in self.algorithms[roi_id]:
self.algorithms[roi_id][algorithm_type].reset()
else:
for algo in self.algorithms[roi_id].values():
algo.reset()
def reset_all(self):
for roi_algorithms in self.algorithms.values():
for algo in roi_algorithms.values():
algo.reset()
def remove_roi(self, roi_id: str):
if roi_id in self.algorithms:
self.reset_algorithm(roi_id)
del self.algorithms[roi_id]
def get_status(self, roi_id: str) -> Dict[str, Any]:
status = {}
if roi_id in self.algorithms:
for algo_type, algo in self.algorithms[roi_id].items():
if algo_type == "leave_post":
status[algo_type] = {
"state": getattr(algo, "state", "INIT_STATE"),
"alarm_sent": getattr(algo, "alarm_sent", False),
}
else:
status[algo_type] = {
"track_count": len(getattr(algo, "track_states", {})),
}
return status