Files
security-ai-edge/algorithms.py

374 lines
13 KiB
Python
Raw Normal View History

2026-01-29 18:33:12 +08:00
import os
import sys
import time
from collections import deque
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class LeavePostAlgorithm:
STATE_WAITING = "WAITING"
2026-01-29 18:33:12 +08:00
STATE_ON_DUTY = "ON_DUTY"
STATE_LEAVING = "LEAVING"
STATE_OFF_DUTY = "OFF_DUTY"
2026-01-29 18:33:12 +08:00
STATE_NON_WORK_TIME = "NON_WORK_TIME"
def __init__(
self,
confirm_on_duty_sec: int = 10,
confirm_leave_sec: int = 10,
cooldown_sec: int = 300,
2026-01-29 18:33:12 +08:00
working_hours: Optional[List[Dict]] = None,
target_class: Optional[str] = "person",
2026-01-29 18:33:12 +08:00
):
self.confirm_on_duty_sec = confirm_on_duty_sec
self.confirm_leave_sec = confirm_leave_sec
self.cooldown_sec = cooldown_sec
2026-01-29 18:33:12 +08:00
self.working_hours = working_hours or []
self.target_class = target_class
2026-01-29 18:33:12 +08:00
self.alert_cooldowns: Dict[str, datetime] = {}
self.state: str = self.STATE_WAITING
2026-01-29 18:33:12 +08:00
self.state_start_time: Optional[datetime] = None
self.detection_history: deque = deque()
2026-01-29 18:33:12 +08:00
self.alarm_sent: bool = False
self.last_person_time: Optional[datetime] = None
2026-01-29 18:33:12 +08:00
def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
2026-01-29 18:33:12 +08:00
if not self.working_hours:
return True
dt = dt or datetime.now()
current_minutes = dt.hour * 60 + dt.minute
for period in self.working_hours:
start_minutes = period["start"][0] * 60 + period["start"][1]
end_minutes = period["end"][0] * 60 + period["end"][1]
if start_minutes <= current_minutes < end_minutes:
return True
return False
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
2026-01-29 18:33:12 +08:00
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool:
if not target_class:
return True
return detection.get("class") == target_class
def _get_detection_window(self, current_time: datetime) -> List[bool]:
detections = []
while self.detection_history and (current_time - self.detection_history[0][0]).total_seconds() > max(self.confirm_on_duty_sec, self.confirm_leave_sec):
self.detection_history.popleft()
for _, has_person in self.detection_history:
detections.append(has_person)
return detections
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
if self._check_detection_in_roi(det, roi_id):
return det.get("bbox", [])
return []
2026-01-29 18:33:12 +08:00
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
current_time = current_time or datetime.now()
in_work = self._is_in_working_hours(current_time)
2026-01-29 18:33:12 +08:00
alerts = []
if not in_work:
self.state = self.STATE_NON_WORK_TIME
self.detection_history.clear()
2026-01-29 18:33:12 +08:00
self.alarm_sent = False
return []
2026-01-29 18:33:12 +08:00
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_WAITING
self.detection_history.clear()
self.alarm_sent = False
2026-01-29 18:33:12 +08:00
roi_has_person = False
2026-01-29 18:33:12 +08:00
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
roi_has_person = True
break
if self.state == self.STATE_WAITING:
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
self.detection_history.clear()
self.detection_history.append((current_time, True))
else:
pass
elif self.state == self.STATE_ON_DUTY:
self.detection_history.append((current_time, roi_has_person))
if not roi_has_person:
self.state = self.STATE_LEAVING
self.state_start_time = current_time
elif self.state == self.STATE_LEAVING:
self.detection_history.append((current_time, roi_has_person))
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
elif elapsed >= self.confirm_leave_sec:
self.state = self.STATE_OFF_DUTY
self.state_start_time = current_time
elif self.state == self.STATE_OFF_DUTY:
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
elif elapsed >= self.confirm_leave_sec:
cooldown_key = f"{camera_id}_{roi_id}"
now = datetime.now()
if cooldown_key not in self.alert_cooldowns or (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
bbox = self._get_latest_bbox(tracks, roi_id)
elapsed_minutes = int(elapsed / 60)
alerts.append({
"track_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"off_duty_duration": elapsed,
"alert_type": "leave_post",
"message": f"离岗超过 {elapsed_minutes} 分钟",
})
self.alert_cooldowns[cooldown_key] = now
return alerts
2026-01-29 18:33:12 +08:00
def reset(self):
self.state = self.STATE_WAITING
2026-01-29 18:33:12 +08:00
self.state_start_time = None
self.detection_history.clear()
2026-01-29 18:33:12 +08:00
self.alarm_sent = False
self.last_person_time = None
2026-01-29 18:33:12 +08:00
self.alert_cooldowns.clear()
def get_state(self, roi_id: str) -> Dict[str, Any]:
2026-01-29 18:33:12 +08:00
return {
"state": self.state,
"alarm_sent": self.alarm_sent,
"last_person_time": self.last_person_time,
2026-01-29 18:33:12 +08:00
}
class IntrusionAlgorithm:
def __init__(
self,
cooldown_seconds: int = 120,
confirm_seconds: int = 3,
target_class: Optional[str] = None,
):
2026-01-29 18:33:12 +08:00
self.cooldown_seconds = cooldown_seconds
self.confirm_seconds = confirm_seconds
self.target_class = target_class
self.last_alert_time: Dict[str, datetime] = {}
2026-01-29 18:33:12 +08:00
self.alert_triggered: Dict[str, bool] = {}
self.detection_start: Dict[str, Optional[datetime]] = {}
2026-01-29 18:33:12 +08:00
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
2026-01-29 18:33:12 +08:00
return False
def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool:
if not target_class:
return True
return detection.get("class") == target_class
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
2026-01-29 18:33:12 +08:00
for det in tracks:
if self._check_detection_in_roi(det, roi_id):
return det.get("bbox", [])
2026-01-29 18:33:12 +08:00
return []
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
current_time = current_time or datetime.now()
key = f"{camera_id}_{roi_id}"
roi_has_person = False
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
roi_has_person = True
break
2026-01-29 18:33:12 +08:00
if not roi_has_person:
self.detection_start.pop(key, None)
self.alert_triggered[key] = False
2026-01-29 18:33:12 +08:00
return []
if self.alert_triggered.get(key, False):
elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds()
if elapsed_since_alert < self.cooldown_seconds:
return []
2026-01-29 18:33:12 +08:00
self.alert_triggered[key] = False
if self.detection_start.get(key) is None:
self.detection_start[key] = current_time
2026-01-29 18:33:12 +08:00
elapsed = (current_time - self.detection_start[key]).total_seconds()
if elapsed < self.confirm_seconds:
2026-01-29 18:33:12 +08:00
return []
bbox = self._get_latest_bbox(tracks, roi_id)
self.last_alert_time[key] = current_time
2026-01-29 18:33:12 +08:00
self.alert_triggered[key] = True
self.detection_start[key] = None
2026-01-29 18:33:12 +08:00
return [{
"roi_id": roi_id,
"camera_id": camera_id,
2026-01-29 18:33:12 +08:00
"bbox": bbox,
"alert_type": "intrusion",
"message": "检测到周界入侵",
}]
def reset(self):
self.last_alert_time.clear()
self.alert_triggered.clear()
self.detection_start.clear()
2026-01-29 18:33:12 +08:00
class AlgorithmManager:
def __init__(self, working_hours: Optional[List[Dict]] = None):
self.algorithms: Dict[str, Dict[str, Any]] = {}
self.working_hours = working_hours or []
self.default_params = {
"leave_post": {
"confirm_on_duty_sec": 10,
"confirm_leave_sec": 10,
"cooldown_sec": 300,
"target_class": "person",
2026-01-29 18:33:12 +08:00
},
"intrusion": {
"cooldown_seconds": 120,
"confirm_seconds": 3,
"target_class": None,
2026-01-29 18:33:12 +08:00
},
}
def register_algorithm(
self,
roi_id: str,
algorithm_type: str,
params: Optional[Dict[str, Any]] = None,
):
if roi_id in self.algorithms and algorithm_type in self.algorithms[roi_id]:
return
2026-01-29 18:33:12 +08:00
if roi_id not in self.algorithms:
self.algorithms[roi_id] = {}
algo_params = self.default_params.get(algorithm_type, {}).copy()
2026-01-29 18:33:12 +08:00
if params:
algo_params.update(params)
if algorithm_type == "leave_post":
roi_working_hours = algo_params.get("working_hours") or self.working_hours
self.algorithms[roi_id]["leave_post"] = LeavePostAlgorithm(
confirm_on_duty_sec=algo_params.get("confirm_on_duty_sec", 10),
confirm_leave_sec=algo_params.get("confirm_leave_sec", 10),
cooldown_sec=algo_params.get("cooldown_sec", 300),
2026-01-29 18:33:12 +08:00
working_hours=roi_working_hours,
target_class=algo_params.get("target_class", "person"),
2026-01-29 18:33:12 +08:00
)
elif algorithm_type == "intrusion":
self.algorithms[roi_id]["intrusion"] = IntrusionAlgorithm(
cooldown_seconds=algo_params.get("cooldown_seconds", 120),
confirm_seconds=algo_params.get("confirm_seconds", 3),
target_class=algo_params.get("target_class"),
2026-01-29 18:33:12 +08:00
)
def process(
self,
roi_id: str,
camera_id: str,
algorithm_type: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
algo = self.algorithms.get(roi_id, {}).get(algorithm_type)
if algo is None:
return []
return algo.process(roi_id, camera_id, tracks, current_time)
def update_roi_params(
self,
roi_id: str,
algorithm_type: str,
params: Dict[str, Any],
):
if roi_id in self.algorithms and algorithm_type in self.algorithms[roi_id]:
algo = self.algorithms[roi_id][algorithm_type]
for key, value in params.items():
if hasattr(algo, key):
setattr(algo, key, value)
def reset_algorithm(self, roi_id: str, algorithm_type: Optional[str] = None):
if roi_id not in self.algorithms:
return
if algorithm_type:
if algorithm_type in self.algorithms[roi_id]:
self.algorithms[roi_id][algorithm_type].reset()
else:
for algo in self.algorithms[roi_id].values():
algo.reset()
def reset_all(self):
for roi_algorithms in self.algorithms.values():
for algo in roi_algorithms.values():
algo.reset()
def remove_roi(self, roi_id: str):
if roi_id in self.algorithms:
self.reset_algorithm(roi_id)
del self.algorithms[roi_id]
def get_status(self, roi_id: str) -> Dict[str, Any]:
status = {}
if roi_id in self.algorithms:
for algo_type, algo in self.algorithms[roi_id].items():
if algo_type == "leave_post":
status[algo_type] = {
"state": getattr(algo, "state", "WAITING"),
2026-01-29 18:33:12 +08:00
"alarm_sent": getattr(algo, "alarm_sent", False),
}
else:
status[algo_type] = {
"detection_count": len(getattr(algo, "detection_start", {})),
2026-01-29 18:33:12 +08:00
}
return status