fix:修复参数顺序错误、ROI 匹配失效、状态机缺失 INIT 状态
Some checks failed
Python Test / test (push) Has been cancelled

- 调整 process() 函数参数顺序,确保 roi_id 和 camera_id 正确对应
- 重构 ROI 匹配逻辑,使用明确的 roi_id 进行区域归属判断
- 引入 INIT 状态:启动时若 ROI 无人,先进入 INIT,
  经 off_duty_confirm_sec 确认后才进入 OFF_DUTY 倒计时
This commit is contained in:
2026-01-22 15:08:28 +08:00
parent 6116f0b982
commit 6fc17ccf64

View File

@@ -16,6 +16,7 @@ class LeavePostAlgorithm:
STATE_ON_DUTY = "ON_DUTY"
STATE_OFF_DUTY_COUNTDOWN = "OFF_DUTY_COUNTDOWN"
STATE_NON_WORK_TIME = "NON_WORK_TIME"
STATE_INIT = "INIT"
def __init__(
self,
@@ -32,11 +33,13 @@ class LeavePostAlgorithm:
self.alert_cooldowns: Dict[str, datetime] = {}
self.cooldown_seconds = 300
self.state: str = self.STATE_ON_DUTY
self.state: str = self.STATE_INIT
self.state_start_time: Optional[datetime] = None
self.on_duty_window = deque()
self.alarm_sent: bool = False
self.last_person_seen_time: Optional[datetime] = None
self.last_detection_time: Optional[datetime] = None
self.init_start_time: Optional[datetime] = None
def is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
if not self.working_hours:
@@ -62,64 +65,95 @@ class LeavePostAlgorithm:
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
current_time = current_time or datetime.now()
roi_has_person = False
for det in tracks:
if self.check_detection_in_roi(det, roi_id):
roi_has_person = True
break
in_work = self.is_in_working_hours(current_time)
alerts = []
if not in_work:
self.state = self.STATE_NON_WORK_TIME
self.last_person_seen_time = None
self.last_detection_time = None
self.on_duty_window.clear()
self.alarm_sent = False
self.init_start_time = None
else:
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_ON_DUTY
self.state = self.STATE_INIT
self.init_start_time = current_time
self.on_duty_window.clear()
self.alarm_sent = False
roi_has_person = False
for det in tracks:
if self.check_detection_in_roi(det, camera_id):
roi_has_person = True
break
if self.state == self.STATE_ON_DUTY:
self.on_duty_window.append((current_time, roi_has_person))
while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec:
self.on_duty_window.popleft()
hit_ratio = sum(1 for t, detected in self.on_duty_window if detected) / max(len(self.on_duty_window), 1)
if not roi_has_person and hit_ratio == 0:
self.state = self.STATE_OFF_DUTY_COUNTDOWN
if self.state == self.STATE_INIT:
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
self.alarm_sent = False
elif hit_ratio >= 0.75 and (current_time - self.on_duty_window[0][0]).total_seconds() >= self.confirm_sec:
self.on_duty_window.clear()
self.on_duty_window.append((current_time, True))
self.last_person_seen_time = current_time
self.last_detection_time = current_time
self.init_start_time = None
else:
if self.init_start_time is None:
self.init_start_time = current_time
elapsed_since_init = (current_time - self.init_start_time).total_seconds()
if elapsed_since_init >= self.threshold_sec:
self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
self.alarm_sent = False
elif self.state == self.STATE_ON_DUTY:
if roi_has_person:
self.last_person_seen_time = current_time
self.last_detection_time = current_time
self.on_duty_window.append((current_time, True))
while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec:
self.on_duty_window.popleft()
else:
self.on_duty_window.append((current_time, False))
while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec:
self.on_duty_window.popleft()
hit_ratio = sum(1 for t, detected in self.on_duty_window if detected) / max(len(self.on_duty_window), 1)
if hit_ratio == 0:
self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
self.alarm_sent = False
elif self.state == self.STATE_OFF_DUTY_COUNTDOWN:
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
self.on_duty_window.clear()
self.on_duty_window.append((current_time, True))
self.last_person_seen_time = current_time
self.alarm_sent = False
elif elapsed >= self.threshold_sec:
if not self.alarm_sent:
cooldown_key = f"{camera_id}"
cooldown_key = f"{roi_id}"
if cooldown_key not in self.alert_cooldowns or (
current_time - self.alert_cooldowns[cooldown_key]
).total_seconds() > self.cooldown_seconds:
bbox = self.get_latest_bbox_in_roi(tracks, camera_id)
bbox = self.get_latest_bbox_in_roi(tracks, roi_id)
elapsed_minutes = int(elapsed / 60)
alerts.append({
"track_id": camera_id,
"track_id": roi_id,
"bbox": bbox,
"off_duty_duration": elapsed,
"alert_type": "leave_post",
@@ -137,11 +171,13 @@ class LeavePostAlgorithm:
return []
def reset(self):
self.state = self.STATE_ON_DUTY
self.state = self.STATE_INIT
self.state_start_time = None
self.on_duty_window.clear()
self.alarm_sent = False
self.last_person_seen_time = None
self.last_detection_time = None
self.init_start_time = None
self.alert_cooldowns.clear()
def get_state(self, track_id: str) -> Optional[Dict[str, Any]]:
@@ -279,7 +315,7 @@ class AlgorithmManager:
algo = self.algorithms.get(roi_id, {}).get(algorithm_type)
if algo is None:
return []
return algo.process(camera_id, tracks, current_time)
return algo.process(roi_id, camera_id, tracks, current_time)
def update_roi_params(
self,