Files
security-ai-edge/algorithms.py
16337 7496a6fe04 feat(aiot): 实现离岗倒计时功能 - 修复持续时长过短问题
问题描述:
- 用户反馈告警持续时长只有48秒、103秒
- 预期:离岗倒计时5分钟后才告警,持续时长应>5.5分钟
- 根因:代码在"离岗确认"后立即触发告警,缺少倒计时环节

业务流程:
1. 上岗确认期:10秒(confirm_on_duty_sec)
2. 离岗确认期:30秒(confirm_leave_sec)
3. 离岗倒计时:300秒(leave_countdown_sec)← 新增
4. 告警冷却期:600秒(cooldown_sec)

修改内容:
1. LeavePostAlgorithm 构造函数
   - 新增 leave_countdown_sec 参数(默认300秒)
   - 新增 _off_duty_start_time 状态变量
   - 新增 _alarm_triggered 告警标志

2. LEAVING → OFF_DUTY 状态转换(Line 197-207)
   - 移除立即告警逻辑
   - 进入OFF_DUTY后仅记录时间,开始倒计时

3. OFF_DUTY 状态处理(Line 209-258)
   - 新增倒计时检查:off_duty_elapsed >= leave_countdown_sec
   - 倒计时结束才触发告警
   - 人员回岗时检查是否已告警,决定是否发送resolve事件

4. 算法实例创建(Line 600-607, 701-708)
   - 从配置读取 leave_countdown_sec(默认300秒)

5. reset() 方法
   - 清理新增状态变量

影响范围:
- 告警时机:从离岗确认后立即告警 → 倒计时结束后告警
- 持续时长:现在必然 >= 330秒(30s确认 + 300s倒计时)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-12 15:06:46 +08:00

819 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import sys
import threading
import time
from collections import deque
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
import redis
logger = logging.getLogger(__name__)
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class LeavePostAlgorithm:
STATE_WAITING = "WAITING"
STATE_CONFIRMING = "CONFIRMING"
STATE_ON_DUTY = "ON_DUTY"
STATE_LEAVING = "LEAVING"
STATE_OFF_DUTY = "OFF_DUTY"
STATE_NON_WORK_TIME = "NON_WORK_TIME"
def __init__(
self,
confirm_on_duty_sec: int = 10,
confirm_leave_sec: int = 30,
leave_countdown_sec: int = 300, # 新增离岗倒计时默认5分钟
cooldown_sec: int = 600,
working_hours: Optional[List[Dict]] = None,
target_class: Optional[str] = "person",
):
self.confirm_on_duty_sec = confirm_on_duty_sec
self.confirm_leave_sec = confirm_leave_sec
self.leave_countdown_sec = leave_countdown_sec # 离岗倒计时
self.cooldown_sec = cooldown_sec
self.working_hours = working_hours or []
self.target_class = target_class
self.alert_cooldowns: Dict[str, datetime] = {}
self.state: str = self.STATE_WAITING
self.state_start_time: Optional[datetime] = None
self.detection_history: deque = deque()
self.alarm_sent: bool = False
self.last_person_time: Optional[datetime] = None
self._last_alarm_id: Optional[str] = None # 最近一次告警ID
self._off_duty_start_time: Optional[datetime] = None # OFF_DUTY状态开始时间用于倒计时
self._alarm_triggered: bool = False # 是否已触发告警OFF_DUTY期间只告警一次
self._leave_start_time: Optional[datetime] = None # 离岗开始时间LEAVING 状态的开始时间)
def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
if not self.working_hours:
return True
import json
working_hours = self.working_hours
if isinstance(working_hours, str):
try:
working_hours = json.loads(working_hours)
except:
return True
if not working_hours:
return True
dt = dt or datetime.now()
current_minutes = dt.hour * 60 + dt.minute
for period in working_hours:
start_str = period["start"] if isinstance(period, dict) else period
end_str = period["end"] if isinstance(period, dict) else period
start_minutes = self._parse_time_to_minutes(start_str)
end_minutes = self._parse_time_to_minutes(end_str)
if start_minutes <= current_minutes < end_minutes:
return True
return False
def _parse_time_to_minutes(self, time_str: str) -> int:
"""将时间字符串转换为分钟数"""
if isinstance(time_str, int):
return time_str
try:
parts = time_str.split(":")
return int(parts[0]) * 60 + int(parts[1])
except:
return 0
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool:
if not target_class:
return True
return detection.get("class") == target_class
def _get_detection_window(self, current_time: datetime) -> List[bool]:
detections = []
while self.detection_history and (current_time - self.detection_history[0][0]).total_seconds() > max(self.confirm_on_duty_sec, self.confirm_leave_sec):
self.detection_history.popleft()
for _, has_person in self.detection_history:
detections.append(has_person)
return detections
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
if self._check_detection_in_roi(det, roi_id):
return det.get("bbox", [])
return []
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
current_time = current_time or datetime.now()
in_work = self._is_in_working_hours(current_time)
alerts = []
if not in_work:
if self.state == self.STATE_OFF_DUTY and self._last_alarm_id and self._leave_start_time:
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "non_work_time",
})
self._last_alarm_id = None
self._leave_start_time = None
self.state = self.STATE_NON_WORK_TIME
self.detection_history.clear()
self.alarm_sent = False
return alerts
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_WAITING
self.state_start_time = None
self.detection_history.clear()
self.alarm_sent = False
self.alert_cooldowns.clear() # 新工作时段,清除冷却记录
roi_has_person = False
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
roi_has_person = True
break
if self.state == self.STATE_WAITING:
if roi_has_person:
# 检测到人,进入上岗确认阶段
self.state = self.STATE_CONFIRMING
self.state_start_time = current_time
self.detection_history.clear()
self.detection_history.append((current_time, True))
else:
pass
elif self.state == self.STATE_CONFIRMING:
self.detection_history.append((current_time, roi_has_person))
if not roi_has_person:
# 人消失,回到等待状态
self.state = self.STATE_WAITING
self.state_start_time = None
self.detection_history.clear()
else:
elapsed = (current_time - self.state_start_time).total_seconds()
if elapsed >= self.confirm_on_duty_sec:
# 持续在岗达到确认时长,正式确认上岗
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
# 确认在岗后清除冷却记录,允许新一轮离岗检测告警
self.alert_cooldowns.clear()
elif self.state == self.STATE_ON_DUTY:
self.detection_history.append((current_time, roi_has_person))
if not roi_has_person:
self.state = self.STATE_LEAVING
self.state_start_time = current_time
elif self.state == self.STATE_LEAVING:
self.detection_history.append((current_time, roi_has_person))
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
# 人回来了回到ON_DUTY
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
elif elapsed >= self.confirm_leave_sec:
# 离岗确认期结束 → 进入OFF_DUTY开始倒计时暂不触发告警
leaving_start_time = self.state_start_time # 保存人员离开时间
self.state = self.STATE_OFF_DUTY
self.state_start_time = current_time
self._off_duty_start_time = current_time # 记录OFF_DUTY开始时间用于倒计时
self._leave_start_time = leaving_start_time # 保存LEAVING开始时间人员离开时间
self._alarm_triggered = False # 重置告警标志
elif self.state == self.STATE_OFF_DUTY:
# OFF_DUTY 状态:离岗倒计时 + 等待人员回岗
off_duty_elapsed = (current_time - self._off_duty_start_time).total_seconds() if self._off_duty_start_time else 0
if roi_has_person:
# 人员回岗
if self._last_alarm_id and self._leave_start_time and self._alarm_triggered:
# 如果已经触发过告警发送resolve事件
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "person_returned",
})
# 清理告警追踪信息
self._last_alarm_id = None
self._leave_start_time = None
self._off_duty_start_time = None
self._alarm_triggered = False
# 回到确认在岗状态
self.state = self.STATE_CONFIRMING
self.state_start_time = current_time
self.detection_history.clear()
self.detection_history.append((current_time, True))
elif off_duty_elapsed >= self.leave_countdown_sec and not self._alarm_triggered:
# 离岗倒计时结束,触发告警
cooldown_key = f"{camera_id}_{roi_id}"
now = datetime.now()
if cooldown_key not in self.alert_cooldowns or (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
bbox = self._get_latest_bbox(tracks, roi_id)
# 计算总离岗时长(从人离开到现在)
total_leave_duration = (current_time - self._leave_start_time).total_seconds() if self._leave_start_time else 0
elapsed_minutes = int(total_leave_duration / 60)
alerts.append({
"track_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"duration_minutes": elapsed_minutes,
"alert_type": "leave_post",
"message": f"离岗 {elapsed_minutes} 分钟",
})
self.alert_cooldowns[cooldown_key] = now
self._alarm_triggered = True # 标记已触发告警
# alarm_id 由 main.py 通过 set_last_alarm_id() 回填
return alerts
def set_last_alarm_id(self, alarm_id: str):
"""由 main.py 在告警生成后回填 alarm_id"""
self._last_alarm_id = alarm_id
def reset(self):
self.state = self.STATE_WAITING
self.state_start_time = None
self.detection_history.clear()
self.alarm_sent = False
self.last_person_time = None
self.alert_cooldowns.clear()
self._last_alarm_id = None
self._leave_start_time = None
self._off_duty_start_time = None
self._alarm_triggered = False
def get_state(self, roi_id: str) -> Dict[str, Any]:
return {
"state": self.state,
"alarm_sent": self.alarm_sent,
"last_person_time": self.last_person_time,
}
class IntrusionAlgorithm:
def __init__(
self,
cooldown_seconds: int = 300,
confirm_seconds: int = 5,
target_class: Optional[str] = None,
):
self.cooldown_seconds = cooldown_seconds
self.confirm_seconds = confirm_seconds
self.target_class = target_class
self.last_alert_time: Dict[str, datetime] = {}
self.alert_triggered: Dict[str, bool] = {}
self.detection_start: Dict[str, Optional[datetime]] = {}
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool:
if not target_class:
return True
return detection.get("class") == target_class
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
if self._check_detection_in_roi(det, roi_id):
return det.get("bbox", [])
return []
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
current_time = current_time or datetime.now()
key = f"{camera_id}_{roi_id}"
roi_has_person = False
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
roi_has_person = True
break
if not roi_has_person:
self.detection_start.pop(key, None)
self.alert_triggered[key] = False
return []
if self.alert_triggered.get(key, False):
elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds()
if elapsed_since_alert < self.cooldown_seconds:
return []
self.alert_triggered[key] = False
if self.detection_start.get(key) is None:
self.detection_start[key] = current_time
elapsed = (current_time - self.detection_start[key]).total_seconds()
if elapsed < self.confirm_seconds:
return []
bbox = self._get_latest_bbox(tracks, roi_id)
self.last_alert_time[key] = current_time
self.alert_triggered[key] = True
self.detection_start[key] = None
return [{
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "intrusion",
"alarm_level": 3,
"message": "检测到周界入侵",
}]
def reset(self):
self.last_alert_time.clear()
self.alert_triggered.clear()
self.detection_start.clear()
# class CrowdDetectionAlgorithm:
# """人群聚集检测算法 - 暂时注释,后续需要时再启用"""
#
# def __init__(
# self,
# max_count: int = 10,
# cooldown_seconds: int = 300,
# target_class: Optional[str] = "person",
# ):
# self.max_count = max_count
# self.cooldown_seconds = cooldown_seconds
# self.target_class = target_class
#
# self.last_alert_time: Dict[str, datetime] = {}
# self.alert_triggered: Dict[str, bool] = {}
#
# def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
# matched_rois = detection.get("matched_rois", [])
# for roi in matched_rois:
# if roi.get("roi_id") == roi_id:
# return True
# return False
#
# def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool:
# if not target_class:
# return True
# return detection.get("class") == target_class
#
# def _get_bboxes(self, tracks: List[Dict], roi_id: str) -> List[List[float]]:
# bboxes = []
# for det in tracks:
# if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
# bboxes.append(det.get("bbox", []))
# return bboxes
#
# def process(
# self,
# roi_id: str,
# camera_id: str,
# tracks: List[Dict],
# current_time: Optional[datetime] = None,
# ) -> List[Dict]:
# current_time = current_time or datetime.now()
# key = f"{camera_id}_{roi_id}"
#
# person_count = 0
# for det in tracks:
# if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
# person_count += 1
#
# if person_count <= self.max_count:
# self.alert_triggered[key] = False
# return []
#
# if self.alert_triggered.get(key, False):
# elapsed_since_alert = (current_time - self.last_alert_time.get(key, datetime.min)).total_seconds()
# if elapsed_since_alert < self.cooldown_seconds:
# return []
# self.alert_triggered[key] = False
#
# bboxes = self._get_bboxes(tracks, roi_id)
# self.last_alert_time[key] = current_time
# self.alert_triggered[key] = True
#
# return [{
# "roi_id": roi_id,
# "camera_id": camera_id,
# "bbox": bboxes[0] if bboxes else [],
# "alert_type": "crowd_detection",
# "message": f"检测到人群聚集,当前人数: {person_count}",
# "count": person_count,
# }]
#
# def reset(self):
# self.last_alert_time.clear()
# self.alert_triggered.clear()
class AlgorithmManager:
def __init__(self, working_hours: Optional[List[Dict]] = None):
self.algorithms: Dict[str, Dict[str, Any]] = {}
self.working_hours = working_hours or []
self._update_lock = threading.Lock()
self._registered_keys: set = set() # 已注册的 (roi_id, bind_id, algo_type) 缓存
self.default_params = {
"leave_post": {
"confirm_on_duty_sec": 10,
"confirm_leave_sec": 30,
"cooldown_sec": 600,
"target_class": "person",
},
"intrusion": {
"cooldown_seconds": 300,
"confirm_seconds": 5,
"target_class": None,
},
# "crowd_detection": {
# "max_count": 10,
# "cooldown_seconds": 300,
# "target_class": "person",
# },
}
self._pubsub = None
self._pubsub_thread = None
self._running = False
def start_config_subscription(self):
"""启动配置变更订阅"""
try:
from config.settings import get_settings
settings = get_settings()
if settings.config_sync_mode != "REDIS":
logger.info("CONFIG_SYNC_MODE=LOCAL: 跳过 Redis 配置订阅")
return
redis_client = redis.Redis(
host=settings.redis.host,
port=settings.redis.port,
db=settings.redis.db,
password=settings.redis.password,
decode_responses=True,
)
self._pubsub = redis_client.pubsub()
self._pubsub.subscribe("config_update")
self._running = True
self._pubsub_thread = threading.Thread(
target=self._config_update_worker,
name="ConfigUpdateSub",
daemon=True
)
self._pubsub_thread.start()
logger.info("已启动配置变更订阅")
except Exception as e:
logger.error(f"启动配置订阅失败: {e}")
def _config_update_worker(self):
"""配置更新订阅工作线程"""
try:
for message in self._pubsub.listen():
if not self._running:
break
if message["type"] == "message":
try:
import json
data = json.loads(message["data"])
update_type = data.get("type", "full")
if update_type == "roi":
roi_ids = data.get("ids", [])
if roi_ids:
for roi_id in roi_ids:
self.reload_algorithm(roi_id)
else:
self.reload_all_algorithms()
elif update_type == "bind":
bind_ids = data.get("ids", [])
if bind_ids:
for bind_id in bind_ids:
self.reload_bind_algorithm(bind_id)
else:
self.reload_all_algorithms()
else:
# type="full" / "camera" / unknown → 全量重载
self.reload_all_algorithms()
except Exception as e:
logger.error(f"处理配置更新消息失败: {e}")
except Exception as e:
logger.error(f"配置订阅线程异常: {e}")
def stop_config_subscription(self):
"""停止配置变更订阅"""
self._running = False
if self._pubsub:
self._pubsub.close()
if self._pubsub_thread and self._pubsub_thread.is_alive():
self._pubsub_thread.join(timeout=5)
logger.info("配置订阅已停止")
def load_bind_from_redis(self, bind_id: str) -> bool:
"""从Redis加载单个绑定配置的算法"""
try:
from core.config_sync import get_config_sync_manager
config_manager = get_config_sync_manager()
bind_config = config_manager.get_algo_bind_from_redis(bind_id)
if not bind_config:
return False
with self._update_lock:
roi_id = bind_config.get("roi_id")
algo_code = bind_config.get("algo_code", "leave_post")
raw_params = bind_config.get("params")
if isinstance(raw_params, str):
try:
import json
params = json.loads(raw_params) or {}
except Exception:
params = {}
elif isinstance(raw_params, dict):
params = raw_params
else:
params = {}
if roi_id not in self.algorithms:
self.algorithms[roi_id] = {}
key = f"{roi_id}_{bind_id}"
if algo_code == "leave_post":
algo_params = {
"confirm_on_duty_sec": params.get("confirm_on_duty_sec", 10),
"confirm_leave_sec": params.get("confirm_leave_sec", 30),
"cooldown_sec": params.get("cooldown_sec", 600),
"working_hours": params.get("working_hours", []),
"target_class": params.get("target_class", bind_config.get("target_class", "person")),
}
if key in self.algorithms.get(roi_id, {}) and "leave_post" in self.algorithms[roi_id].get(key, {}):
algo = self.algorithms[roi_id][key]["leave_post"]
algo.confirm_on_duty_sec = algo_params["confirm_on_duty_sec"]
algo.confirm_leave_sec = algo_params["confirm_leave_sec"]
algo.cooldown_sec = algo_params["cooldown_sec"]
algo.target_class = algo_params["target_class"]
if algo_params["working_hours"]:
algo.working_hours = algo_params["working_hours"]
logger.info(f"已热更新算法参数: {key}")
else:
self.algorithms[roi_id][key] = {}
self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm(
confirm_on_duty_sec=algo_params["confirm_on_duty_sec"],
confirm_leave_sec=algo_params["confirm_leave_sec"],
leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时默认5分钟
cooldown_sec=algo_params["cooldown_sec"],
working_hours=algo_params["working_hours"],
target_class=algo_params["target_class"],
)
logger.info(f"已从Redis加载算法: {key}")
elif algo_code == "intrusion":
algo_params = {
"cooldown_seconds": params.get("cooldown_seconds", 300),
"confirm_seconds": params.get("confirm_seconds", 5),
"target_class": params.get("target_class", bind_config.get("target_class")),
}
self.algorithms[roi_id][key] = {}
self.algorithms[roi_id][key]["intrusion"] = IntrusionAlgorithm(
cooldown_seconds=algo_params["cooldown_seconds"],
confirm_seconds=algo_params["confirm_seconds"],
target_class=algo_params["target_class"],
)
logger.info(f"已从Redis加载算法: {key}")
return True
except Exception as e:
logger.error(f"从Redis加载算法配置失败: {e}")
return False
def reload_bind_algorithm(self, bind_id: str) -> bool:
"""重新加载单个绑定的算法配置"""
return self.load_bind_from_redis(bind_id)
def reload_algorithm(self, roi_id: str) -> bool:
"""重新加载单个ROI的所有算法绑定配置"""
try:
from core.config_sync import get_config_sync_manager
config_manager = get_config_sync_manager()
bindings = config_manager.get_bindings_from_redis(roi_id)
if not bindings:
return False
for bind in bindings:
bind_id = bind.get("bind_id")
self.reset_algorithm(roi_id, bind_id)
self.load_bind_from_redis(bind_id)
return True
except Exception as e:
logger.error(f"重新加载ROI算法配置失败: {e}")
return False
def reload_all_algorithms(self) -> int:
"""重新加载所有算法配置"""
count = 0
try:
from core.config_sync import get_config_sync_manager
config_manager = get_config_sync_manager()
bindings = config_manager.get_bindings_from_redis("")
for bind in bindings:
bind_id = bind.get("bind_id")
roi_id = bind.get("roi_id")
self.reset_algorithm(roi_id, bind_id)
if self.load_bind_from_redis(bind_id):
count += 1
logger.info(f"已重新加载 {count} 个算法配置")
return count
except Exception as e:
logger.error(f"重新加载所有算法配置失败: {e}")
return count
def register_algorithm(
self,
roi_id: str,
bind_id: str,
algorithm_type: str,
params: Optional[Dict[str, Any]] = None,
):
"""注册算法支持绑定ID使用缓存避免每帧重复查询"""
cache_key = (roi_id, bind_id, algorithm_type)
# 快速路径:已注册直接返回
if cache_key in self._registered_keys:
return
key = f"{roi_id}_{bind_id}"
if roi_id not in self.algorithms:
self.algorithms[roi_id] = {}
if key not in self.algorithms[roi_id]:
self.algorithms[roi_id][key] = {}
algo_params = self.default_params.get(algorithm_type, {}).copy()
if params:
algo_params.update(params)
if algorithm_type == "leave_post":
roi_working_hours = algo_params.get("working_hours") or self.working_hours
self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm(
confirm_on_duty_sec=algo_params.get("confirm_on_duty_sec", 10),
confirm_leave_sec=algo_params.get("confirm_leave_sec", 30),
leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时默认5分钟
cooldown_sec=algo_params.get("cooldown_sec", 600),
working_hours=roi_working_hours,
target_class=algo_params.get("target_class", "person"),
)
elif algorithm_type == "intrusion":
self.algorithms[roi_id][key]["intrusion"] = IntrusionAlgorithm(
cooldown_seconds=algo_params.get("cooldown_seconds", 300),
confirm_seconds=algo_params.get("confirm_seconds", 5),
target_class=algo_params.get("target_class"),
)
# elif algorithm_type == "crowd_detection":
# from algorithms import CrowdDetectionAlgorithm
# self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm(
# max_count=algo_params.get("max_count", 10),
# cooldown_seconds=algo_params.get("cooldown_seconds", 300),
# target_class=algo_params.get("target_class", "person"),
# )
self._registered_keys.add(cache_key)
def process(
self,
roi_id: str,
bind_id: str,
camera_id: str,
algorithm_type: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
"""处理检测结果支持绑定ID"""
key = f"{roi_id}_{bind_id}"
algo = self.algorithms.get(roi_id, {}).get(key, {}).get(algorithm_type)
if algo is None:
return []
return algo.process(roi_id, camera_id, tracks, current_time)
def update_roi_params(
self,
roi_id: str,
bind_id: str,
algorithm_type: str,
params: Dict[str, Any],
):
"""更新算法参数支持绑定ID"""
key = f"{roi_id}_{bind_id}"
if roi_id in self.algorithms and key in self.algorithms[roi_id] and algorithm_type in self.algorithms[roi_id][key]:
algo = self.algorithms[roi_id][key][algorithm_type]
for param_key, value in params.items():
if hasattr(algo, param_key):
setattr(algo, param_key, value)
def reset_algorithm(self, roi_id: str, bind_id: Optional[str] = None):
"""重置算法状态支持绑定ID"""
if roi_id not in self.algorithms:
return
if bind_id:
key = f"{roi_id}_{bind_id}"
if key in self.algorithms[roi_id]:
for algo in self.algorithms[roi_id][key].values():
algo.reset()
# 清除注册缓存
self._registered_keys = {
k for k in self._registered_keys
if not (k[0] == roi_id and k[1] == bind_id)
}
else:
for key in self.algorithms[roi_id]:
for algo in self.algorithms[roi_id][key].values():
algo.reset()
# 清除该 roi 的所有注册缓存
self._registered_keys = {
k for k in self._registered_keys if k[0] != roi_id
}
def reset_all(self):
"""重置所有算法"""
for roi_algorithms in self.algorithms.values():
for bind_algorithms in roi_algorithms.values():
for algo in bind_algorithms.values():
algo.reset()
def remove_roi(self, roi_id: str):
"""移除ROI的所有算法"""
if roi_id in self.algorithms:
for key in list(self.algorithms[roi_id].keys()):
self.reset_algorithm(roi_id, key.split("_")[-1] if "_" in key else None)
del self.algorithms[roi_id]
def remove_bind(self, roi_id: str, bind_id: str):
"""移除绑定的算法"""
key = f"{roi_id}_{bind_id}"
if roi_id in self.algorithms and key in self.algorithms[roi_id]:
for algo in self.algorithms[roi_id][key].values():
algo.reset()
del self.algorithms[roi_id][key]
def get_status(self, roi_id: str) -> Dict[str, Any]:
"""获取算法状态"""
status = {}
if roi_id in self.algorithms:
for key, bind_algorithms in self.algorithms[roi_id].items():
bind_id = key.split("_", 1)[-1] if "_" in key else ""
for algo_type, algo in bind_algorithms.items():
if algo_type == "leave_post":
status[f"{algo_type}_{bind_id}"] = {
"state": getattr(algo, "state", "WAITING"),
"alarm_sent": getattr(algo, "alarm_sent", False),
}
else:
status[f"{algo_type}_{bind_id}"] = {
"detection_count": len(getattr(algo, "detection_start", {})),
}
return status