refactor(aiot): 离岗检测算法v2.0 - 全面重构

重构目标:
- 更清晰的状态机设计
- 滑动窗口平滑检测结果
- 更准确的告警判断逻辑

核心改进:
1. 状态机优化(6个状态 → 7个状态)
   - INIT: 初始化
   - CONFIRMING_ON_DUTY: 上岗确认中
   - ON_DUTY: 在岗
   - CONFIRMING_OFF_DUTY: 离岗确认中
   - OFF_DUTY_COUNTDOWN: 离岗倒计时
   - ALARMED: 已告警
   - NON_WORK_TIME: 非工作时间

2. 滑动窗口机制
   - 10秒滑动窗口,存储检测历史
   - 计算命中率(person_count / total_frames)
   - 上岗条件:命中率 ≥ 70%(允许30%漏检)
   - 离岗条件:命中率 = 0(窗口内完全没人)

3. 参数优化
   - confirm_on_duty_sec: 上岗确认(默认10秒)
   - confirm_off_duty_sec: 离岗确认(默认30秒)
   - confirm_return_sec: 回岗确认(默认10秒)
   - leave_countdown_sec: 离岗倒计时(默认300秒)
   - 向后兼容:confirm_leave_sec → confirm_off_duty_sec

4. 状态监控增强
   - get_state() 返回详细状态信息
   - 包含倒计时剩余时间、检测命中率等

5. 日志分级
   - INFO: 关键状态转换(确认上岗、确认离岗)
   - DEBUG: 次要状态转换(进入确认状态)
   - WARNING: 告警触发

技术细节:
- 使用deque实现O(1)滑动窗口更新
- 兼容旧参数名(confirm_leave_sec)
- 回岗自动发送resolve事件
- 非工作时间自动清理状态

影响范围:
- 告警判断更准确(抗漏检干扰)
- 状态转换更合理(细化确认流程)
- 调试更友好(详细状态信息)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-12 15:41:05 +08:00
parent 7496a6fe04
commit 5b2440c467

View File

@@ -17,44 +17,71 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class LeavePostAlgorithm: class LeavePostAlgorithm:
STATE_WAITING = "WAITING" """
STATE_CONFIRMING = "CONFIRMING" 离岗检测算法(优化版 v2.0
STATE_ON_DUTY = "ON_DUTY"
STATE_LEAVING = "LEAVING" 状态机:
STATE_OFF_DUTY = "OFF_DUTY" INIT → CONFIRMING_ON_DUTY → ON_DUTY → CONFIRMING_OFF_DUTY
STATE_NON_WORK_TIME = "NON_WORK_TIME" → OFF_DUTY_COUNTDOWN → ALARMED → (回岗) → CONFIRMING_ON_DUTY
业务流程:
1. 启动后检测到人 → 上岗确认期confirm_on_duty_sec默认10秒
2. 确认上岗后 → 在岗状态ON_DUTY
3. 人离开ROI → 离岗确认期confirm_off_duty_sec默认30秒
4. 确认离岗后 → 离岗倒计时leave_countdown_sec默认300秒
5. 倒计时结束 → 触发告警ALARMED状态
6. 人员回岗 → 回岗确认期confirm_return_sec默认10秒
7. 确认回岗 → 发送resolve事件 → 重新上岗确认
"""
# 状态定义
STATE_INIT = "INIT" # 初始化
STATE_CONFIRMING_ON_DUTY = "CONFIRMING_ON_DUTY" # 上岗确认中
STATE_ON_DUTY = "ON_DUTY" # 在岗
STATE_CONFIRMING_OFF_DUTY = "CONFIRMING_OFF_DUTY" # 离岗确认中
STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN" # 离岗倒计时中
STATE_ALARMED = "ALARMED" # 已告警(等待回岗)
STATE_NON_WORK_TIME = "NON_WORK_TIME" # 非工作时间
def __init__( def __init__(
self, self,
confirm_on_duty_sec: int = 10, confirm_on_duty_sec: int = 10, # 上岗确认窗口(持续检测到人的时长)
confirm_leave_sec: int = 30, confirm_off_duty_sec: int = 30, # 离岗确认窗口(持续未检测到人的时长)
leave_countdown_sec: int = 300, # 新增离岗倒计时默认5分钟 confirm_return_sec: int = 10, # 回岗确认窗口(告警后回来需确认时长
cooldown_sec: int = 600, leave_countdown_sec: int = 300, # 离岗倒计时(确认离岗后等待告警的时长)
cooldown_sec: int = 600, # 告警冷却期(两次告警的最小间隔)
working_hours: Optional[List[Dict]] = None, working_hours: Optional[List[Dict]] = None,
target_class: Optional[str] = "person", target_class: Optional[str] = "person",
# 兼容旧参数名(向后兼容)
confirm_leave_sec: Optional[int] = None,
): ):
# 时间参数(处理向后兼容)
self.confirm_on_duty_sec = confirm_on_duty_sec self.confirm_on_duty_sec = confirm_on_duty_sec
self.confirm_leave_sec = confirm_leave_sec self.confirm_off_duty_sec = confirm_leave_sec if confirm_leave_sec is not None else confirm_off_duty_sec
self.leave_countdown_sec = leave_countdown_sec # 离岗倒计时 self.confirm_return_sec = confirm_return_sec
self.leave_countdown_sec = leave_countdown_sec
self.cooldown_sec = cooldown_sec self.cooldown_sec = cooldown_sec
# 工作时间和目标类别
self.working_hours = working_hours or [] self.working_hours = working_hours or []
self.target_class = target_class self.target_class = target_class
# 状态变量
self.state: str = self.STATE_INIT
self.state_start_time: Optional[datetime] = None
# 滑动窗口(用于平滑检测结果)
self.detection_window: deque = deque() # [(timestamp, has_person), ...]
self.window_size_sec = 10 # 滑动窗口大小10秒
# 告警追踪
self._last_alarm_id: Optional[str] = None
self._leave_start_time: Optional[datetime] = None # 人员离开时间(用于计算持续时长)
self._alarm_triggered_time: Optional[datetime] = None # 告警触发时间
self.alert_cooldowns: Dict[str, datetime] = {} self.alert_cooldowns: Dict[str, datetime] = {}
self.state: str = self.STATE_WAITING
self.state_start_time: Optional[datetime] = None
self.detection_history: deque = deque()
self.alarm_sent: bool = False
self.last_person_time: Optional[datetime] = None
self._last_alarm_id: Optional[str] = None # 最近一次告警ID
self._off_duty_start_time: Optional[datetime] = None # OFF_DUTY状态开始时间用于倒计时
self._alarm_triggered: bool = False # 是否已触发告警OFF_DUTY期间只告警一次
self._leave_start_time: Optional[datetime] = None # 离岗开始时间LEAVING 状态的开始时间)
def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
"""检查是否在工作时间"""
if not self.working_hours: if not self.working_hours:
return True return True
@@ -92,31 +119,43 @@ class LeavePostAlgorithm:
return 0 return 0
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
"""检查检测结果是否在ROI内"""
matched_rois = detection.get("matched_rois", []) matched_rois = detection.get("matched_rois", [])
for roi in matched_rois: return any(roi.get("roi_id") == roi_id for roi in matched_rois)
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_class(self, detection: Dict, target_class: Optional[str]) -> bool: def _check_target_class(self, detection: Dict, target_class: str) -> bool:
"""检查是否为目标类别"""
if not target_class: if not target_class:
return True return True
return detection.get("class") == target_class return detection.get("class") == target_class
def _get_detection_window(self, current_time: datetime) -> List[bool]:
detections = []
while self.detection_history and (current_time - self.detection_history[0][0]).total_seconds() > max(self.confirm_on_duty_sec, self.confirm_leave_sec):
self.detection_history.popleft()
for _, has_person in self.detection_history:
detections.append(has_person)
return detections
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
"""获取ROI内最新的检测框"""
for det in tracks: for det in tracks:
if self._check_detection_in_roi(det, roi_id): if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
return det.get("bbox", []) return det.get("bbox", [])
return [] return []
def _update_detection_window(self, current_time: datetime, has_person: bool):
"""更新滑动窗口"""
self.detection_window.append((current_time, has_person))
# 移除窗口外的旧数据
while self.detection_window:
oldest_time, _ = self.detection_window[0]
if (current_time - oldest_time).total_seconds() > self.window_size_sec:
self.detection_window.popleft()
else:
break
def _get_detection_ratio(self) -> float:
"""计算滑动窗口内的检测命中率"""
if not self.detection_window:
return 0.0
person_count = sum(1 for _, has_person in self.detection_window if has_person)
return person_count / len(self.detection_window)
def process( def process(
self, self,
roi_id: str, roi_id: str,
@@ -124,125 +163,135 @@ class LeavePostAlgorithm:
tracks: List[Dict], tracks: List[Dict],
current_time: Optional[datetime] = None, current_time: Optional[datetime] = None,
) -> List[Dict]: ) -> List[Dict]:
current_time = current_time or datetime.now() """
处理单帧检测结果
in_work = self._is_in_working_hours(current_time) Args:
roi_id: ROI区域ID
camera_id: 摄像头ID
tracks: 检测结果列表
current_time: 当前时间用于测试生产环境传None
Returns:
告警列表 [{"alert_type": "leave_post", ...}, {"alert_type": "alarm_resolve", ...}]
"""
current_time = current_time or datetime.now()
alerts = [] alerts = []
if not in_work: # 检查ROI内是否有目标
if self.state == self.STATE_OFF_DUTY and self._last_alarm_id and self._leave_start_time: roi_has_person = any(
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class)
alerts.append({ for det in tracks
"alert_type": "alarm_resolve", )
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "non_work_time",
})
self._last_alarm_id = None
self._leave_start_time = None
self.state = self.STATE_NON_WORK_TIME
self.detection_history.clear()
self.alarm_sent = False
return alerts
if self.state == self.STATE_NON_WORK_TIME: # 更新滑动窗口
self.state = self.STATE_WAITING self._update_detection_window(current_time, roi_has_person)
self.state_start_time = None detection_ratio = self._get_detection_ratio()
self.detection_history.clear()
self.alarm_sent = False
self.alert_cooldowns.clear() # 新工作时段,清除冷却记录
roi_has_person = False # 检查工作时间
for det in tracks: in_working_hours = self._is_in_working_hours(current_time)
if self._check_detection_in_roi(det, roi_id) and self._check_target_class(det, self.target_class):
roi_has_person = True
break
if self.state == self.STATE_WAITING: # === 非工作时间处理 ===
if roi_has_person: if not in_working_hours:
# 检测到人,进入上岗确认阶段 if self.state != self.STATE_NON_WORK_TIME:
self.state = self.STATE_CONFIRMING # 进入非工作时间,清理状态
self.state_start_time = current_time if self._last_alarm_id and self._leave_start_time:
self.detection_history.clear() # 如果有未结束的告警发送resolve事件非工作时间自动关闭
self.detection_history.append((current_time, True))
else:
pass
elif self.state == self.STATE_CONFIRMING:
self.detection_history.append((current_time, roi_has_person))
if not roi_has_person:
# 人消失,回到等待状态
self.state = self.STATE_WAITING
self.state_start_time = None
self.detection_history.clear()
else:
elapsed = (current_time - self.state_start_time).total_seconds()
if elapsed >= self.confirm_on_duty_sec:
# 持续在岗达到确认时长,正式确认上岗
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
# 确认在岗后清除冷却记录,允许新一轮离岗检测告警
self.alert_cooldowns.clear()
elif self.state == self.STATE_ON_DUTY:
self.detection_history.append((current_time, roi_has_person))
if not roi_has_person:
self.state = self.STATE_LEAVING
self.state_start_time = current_time
elif self.state == self.STATE_LEAVING:
self.detection_history.append((current_time, roi_has_person))
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
# 人回来了回到ON_DUTY
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
elif elapsed >= self.confirm_leave_sec:
# 离岗确认期结束 → 进入OFF_DUTY开始倒计时暂不触发告警
leaving_start_time = self.state_start_time # 保存人员离开时间
self.state = self.STATE_OFF_DUTY
self.state_start_time = current_time
self._off_duty_start_time = current_time # 记录OFF_DUTY开始时间用于倒计时
self._leave_start_time = leaving_start_time # 保存LEAVING开始时间人员离开时间
self._alarm_triggered = False # 重置告警标志
elif self.state == self.STATE_OFF_DUTY:
# OFF_DUTY 状态:离岗倒计时 + 等待人员回岗
off_duty_elapsed = (current_time - self._off_duty_start_time).total_seconds() if self._off_duty_start_time else 0
if roi_has_person:
# 人员回岗
if self._last_alarm_id and self._leave_start_time and self._alarm_triggered:
# 如果已经触发过告警发送resolve事件
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
alerts.append({ alerts.append({
"alert_type": "alarm_resolve", "alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id, "resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms, "duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "person_returned", "resolve_type": "non_work_time",
}) })
# 清理告警追踪信息
self.state = self.STATE_NON_WORK_TIME
self.state_start_time = None
self.detection_window.clear()
self._last_alarm_id = None self._last_alarm_id = None
self._leave_start_time = None self._leave_start_time = None
self._off_duty_start_time = None self._alarm_triggered_time = None
self._alarm_triggered = False return alerts
# 回到确认在岗状态
self.state = self.STATE_CONFIRMING # === 工作时间处理 ===
# 从非工作时间恢复
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_INIT
self.state_start_time = current_time
self.detection_window.clear()
# === 状态机处理 ===
if self.state == self.STATE_INIT:
# 初始化状态:等待检测到人
if roi_has_person:
self.state = self.STATE_CONFIRMING_ON_DUTY
self.state_start_time = current_time self.state_start_time = current_time
self.detection_history.clear() logger.debug(f"ROI {roi_id}: INIT → CONFIRMING_ON_DUTY")
self.detection_history.append((current_time, True))
elif off_duty_elapsed >= self.leave_countdown_sec and not self._alarm_triggered: elif self.state == self.STATE_CONFIRMING_ON_DUTY:
# 离岗倒计时结束,触发告警 # 上岗确认中:需要持续检测到人
elapsed = (current_time - self.state_start_time).total_seconds()
if detection_ratio == 0:
# 人消失了回到INIT
self.state = self.STATE_INIT
self.state_start_time = current_time
self.detection_window.clear()
logger.debug(f"ROI {roi_id}: CONFIRMING_ON_DUTY → INIT (人消失)")
elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.7:
# 上岗确认成功(命中率>=70%
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
self.alert_cooldowns.clear() # 确认在岗后清除冷却记录
logger.info(f"ROI {roi_id}: CONFIRMING_ON_DUTY → ON_DUTY (上岗确认成功)")
elif self.state == self.STATE_ON_DUTY:
# 在岗状态:监控是否离岗
if detection_ratio == 0:
# 滑动窗口内完全没有人,进入离岗确认
self.state = self.STATE_CONFIRMING_OFF_DUTY
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: ON_DUTY → CONFIRMING_OFF_DUTY")
elif self.state == self.STATE_CONFIRMING_OFF_DUTY:
# 离岗确认中:需要持续未检测到人
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
# 人回来了回到ON_DUTY
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了)")
elif elapsed >= self.confirm_off_duty_sec and detection_ratio == 0:
# 离岗确认成功,进入倒计时
self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
self._leave_start_time = self.state_start_time # 记录离开时间
logger.info(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → OFF_DUTY_COUNTDOWN (离岗确认成功)")
elif self.state == self.STATE_OFF_DUTY_COUNTDOWN:
# 离岗倒计时中:等待告警触发
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
# 倒计时期间人回来了回到ON_DUTY未触发告警
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
self._leave_start_time = None
logger.info(f"ROI {roi_id}: OFF_DUTY_COUNTDOWN → ON_DUTY (倒计时期间回来)")
elif elapsed >= self.leave_countdown_sec:
# 倒计时结束,触发告警
cooldown_key = f"{camera_id}_{roi_id}" cooldown_key = f"{camera_id}_{roi_id}"
now = datetime.now() if cooldown_key not in self.alert_cooldowns or \
if cooldown_key not in self.alert_cooldowns or (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
bbox = self._get_latest_bbox(tracks, roi_id) bbox = self._get_latest_bbox(tracks, roi_id)
# 计算总离岗时长(从人离开到现在) total_off_duty_sec = (current_time - self._leave_start_time).total_seconds()
total_leave_duration = (current_time - self._leave_start_time).total_seconds() if self._leave_start_time else 0 elapsed_minutes = int(total_off_duty_sec / 60)
elapsed_minutes = int(total_leave_duration / 60)
alerts.append({ alerts.append({
"track_id": roi_id, "track_id": roi_id,
"camera_id": camera_id, "camera_id": camera_id,
@@ -251,9 +300,38 @@ class LeavePostAlgorithm:
"alert_type": "leave_post", "alert_type": "leave_post",
"message": f"离岗 {elapsed_minutes} 分钟", "message": f"离岗 {elapsed_minutes} 分钟",
}) })
self.alert_cooldowns[cooldown_key] = now
self._alarm_triggered = True # 标记已触发告警 self.alert_cooldowns[cooldown_key] = current_time
# alarm_id 由 main.py 通过 set_last_alarm_id() 回填 self._alarm_triggered_time = current_time
self.state = self.STATE_ALARMED
# _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填
logger.warning(f"ROI {roi_id}: OFF_DUTY_COUNTDOWN → ALARMED (告警触发)")
elif self.state == self.STATE_ALARMED:
# 已告警状态:等待人员回岗
if roi_has_person:
# 检测到人,进入回岗确认
self.state = self.STATE_CONFIRMING_ON_DUTY # 复用上岗确认状态
self.state_start_time = current_time
logger.info(f"ROI {roi_id}: ALARMED → CONFIRMING_ON_DUTY (检测到人回岗)")
# 特殊处理从CONFIRMING_ON_DUTY再次确认上岗时如果有未结束的告警发送resolve
if self.state == self.STATE_ON_DUTY and self._last_alarm_id:
# 回岗确认成功发送resolve事件
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "person_returned",
})
# 清理告警追踪信息
self._last_alarm_id = None
self._leave_start_time = None
self._alarm_triggered_time = None
logger.info(f"ROI {roi_id}: 告警已解决(人员回岗)")
return alerts return alerts
@@ -262,24 +340,36 @@ class LeavePostAlgorithm:
self._last_alarm_id = alarm_id self._last_alarm_id = alarm_id
def reset(self): def reset(self):
self.state = self.STATE_WAITING """重置算法状态"""
self.state = self.STATE_INIT
self.state_start_time = None self.state_start_time = None
self.detection_history.clear() self.detection_window.clear()
self.alarm_sent = False
self.last_person_time = None
self.alert_cooldowns.clear()
self._last_alarm_id = None self._last_alarm_id = None
self._leave_start_time = None self._leave_start_time = None
self._off_duty_start_time = None self._alarm_triggered_time = None
self._alarm_triggered = False self.alert_cooldowns.clear()
def get_state(self, roi_id: str) -> Dict[str, Any]: def get_state(self, roi_id: str) -> Dict[str, Any]:
return { """获取当前状态(用于调试和监控)"""
state_info = {
"state": self.state, "state": self.state,
"alarm_sent": self.alarm_sent, "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
"last_person_time": self.last_person_time, "detection_ratio": self._get_detection_ratio(),
"window_size": len(self.detection_window),
} }
# 添加状态特定信息
if self.state == self.STATE_OFF_DUTY_COUNTDOWN and self.state_start_time:
elapsed = (datetime.now() - self.state_start_time).total_seconds()
state_info["countdown_remaining_sec"] = max(0, self.leave_countdown_sec - elapsed)
if self.state == self.STATE_ALARMED and self._leave_start_time:
total_off_duty_sec = (datetime.now() - self._leave_start_time).total_seconds()
state_info["total_off_duty_sec"] = total_off_duty_sec
state_info["alarm_id"] = self._last_alarm_id
return state_info
class IntrusionAlgorithm: class IntrusionAlgorithm:
def __init__( def __init__(