feat(aiot): 离岗检测重写 - 单次告警 + 回岗确认 + 持续时长追踪

算法逻辑修改:
- OFF_DUTY状态只告警一次,不再每600秒重复告警
- 人员回岗后需经CONFIRMING(10秒)重新确认才恢复ON_DUTY
- 确认在岗后清除冷却记录,允许新一轮离岗检测
- 非工作时间进入时清除冷却记录

持续时长追踪(新增resolve机制):
- 离岗告警记录alarm_id和leave_start_time
- 人员回岗确认后生成resolve事件(duration_ms + last_frame_time)
- 进入非工作时间时也生成resolve事件
- ResultReporter新增report_alarm_resolve()写入Redis队列
- AlarmUploadWorker新增_process_resolve() HTTP POST到云端
- main.py区分普通告警和resolve事件,回填alarm_id到算法实例
- 告警ext_data附加first_frame_time(离岗开始时间)
This commit is contained in:
2026-02-11 17:55:35 +08:00
parent abcd40f88b
commit ecebdd514f
4 changed files with 127 additions and 18 deletions

View File

@@ -47,6 +47,9 @@ class LeavePostAlgorithm:
self.alarm_sent: bool = False
self.last_person_time: Optional[datetime] = None
self._last_alarm_id: Optional[str] = None # 最近一次告警ID
self._leave_start_time: Optional[datetime] = None # 离岗开始时间LEAVING 状态的开始时间)
def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool:
if not self.working_hours:
return True
@@ -123,15 +126,28 @@ class LeavePostAlgorithm:
alerts = []
if not in_work:
if self.state == self.STATE_OFF_DUTY and self._last_alarm_id and self._leave_start_time:
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.isoformat(),
"resolve_type": "non_work_time",
})
self._last_alarm_id = None
self._leave_start_time = None
self.state = self.STATE_NON_WORK_TIME
self.detection_history.clear()
self.alarm_sent = False
return []
return alerts
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_WAITING
self.state_start_time = None
self.detection_history.clear()
self.alarm_sent = False
self.alert_cooldowns.clear() # 新工作时段,清除冷却记录
roi_has_person = False
for det in tracks:
@@ -162,6 +178,8 @@ class LeavePostAlgorithm:
# 持续在岗达到确认时长,正式确认上岗
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
# 确认在岗后清除冷却记录,允许新一轮离岗检测告警
self.alert_cooldowns.clear()
elif self.state == self.STATE_ON_DUTY:
self.detection_history.append((current_time, roi_has_person))
@@ -178,6 +196,7 @@ class LeavePostAlgorithm:
self.state_start_time = current_time
elif elapsed >= self.confirm_leave_sec:
# 确认离岗后直接触发告警,不再进入 OFF_DUTY 二次等待
leaving_start_time = self.state_start_time # 保存 LEAVING 状态开始时间(人员离开时间)
self.state = self.STATE_OFF_DUTY
self.state_start_time = current_time
@@ -196,30 +215,37 @@ class LeavePostAlgorithm:
})
self.alert_cooldowns[cooldown_key] = now
# 保存告警追踪信息alarm_id 由 main.py 通过 set_last_alarm_id() 回填)
self._last_alarm_id = None
self._leave_start_time = leaving_start_time # LEAVING 状态开始时间 = 人员离开时间
elif self.state == self.STATE_OFF_DUTY:
# OFF_DUTY 状态:等待人员回岗或冷却后可再次告警
# OFF_DUTY 状态:等待人员回岗,不再重复告警
# 必须经过 CONFIRMING 重新确认在岗后,才允许新一轮离岗检测
if roi_has_person:
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
else:
elapsed = (current_time - self.state_start_time).total_seconds()
cooldown_key = f"{camera_id}_{roi_id}"
now = datetime.now()
if cooldown_key in self.alert_cooldowns and (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
bbox = self._get_latest_bbox(tracks, roi_id)
elapsed_minutes = int(elapsed / 60)
# 生成 resolve 事件(人员回岗)
if self._last_alarm_id and self._leave_start_time:
duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)
alerts.append({
"track_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"duration_minutes": elapsed_minutes,
"alert_type": "leave_post",
"message": f"持续离岗 {elapsed_minutes} 分钟",
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.isoformat(),
"resolve_type": "person_returned",
})
self.alert_cooldowns[cooldown_key] = now
self._last_alarm_id = None
self._leave_start_time = None
self.state = self.STATE_CONFIRMING
self.state_start_time = current_time
self.detection_history.clear()
self.detection_history.append((current_time, True))
return alerts
def set_last_alarm_id(self, alarm_id: str):
"""由 main.py 在告警生成后回填 alarm_id"""
self._last_alarm_id = alarm_id
def reset(self):
self.state = self.STATE_WAITING
self.state_start_time = None
@@ -227,6 +253,8 @@ class LeavePostAlgorithm:
self.alarm_sent = False
self.last_person_time = None
self.alert_cooldowns.clear()
self._last_alarm_id = None
self._leave_start_time = None
def get_state(self, roi_id: str) -> Dict[str, Any]:
return {

View File

@@ -173,6 +173,11 @@ class AlarmUploadWorker:
self._logger.error(f"告警 JSON 解析失败: {e}")
return
# 如果是 resolve 事件,走单独的处理逻辑
if alarm_data.get("_type") == "resolve":
self._process_resolve(alarm_data)
return
alarm_id = alarm_data.get("alarm_id", "unknown")
retry_count = alarm_data.get("_retry_count", 0)
@@ -241,6 +246,36 @@ class AlarmUploadWorker:
# HTTP 上报失败,进入重试
self._handle_retry(alarm_json, "HTTP 上报失败")
def _process_resolve(self, resolve_data: dict):
"""处理告警结束事件 - HTTP POST 到云端"""
upload_cfg = self._settings.alarm_upload
base_url = upload_cfg.cloud_api_url.rstrip("/")
url = f"{base_url}/admin-api/aiot/alarm/edge/resolve"
headers = {"Content-Type": "application/json"}
if upload_cfg.edge_token:
headers["Authorization"] = f"Bearer {upload_cfg.edge_token}"
payload = {
"alarm_id": resolve_data.get("alarm_id"),
"duration_ms": resolve_data.get("duration_ms"),
"last_frame_time": resolve_data.get("last_frame_time"),
"resolve_type": resolve_data.get("resolve_type"),
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
body = response.json()
if body.get("code") == 0:
self._logger.info(f"告警结束上报成功: {resolve_data.get('alarm_id')}")
else:
self._logger.warning(f"告警结束上报业务错误: {body}")
else:
self._logger.warning(f"告警结束上报失败: status={response.status_code}")
except Exception as e:
self._logger.warning(f"告警结束上报异常: {e}")
def _upload_snapshot_to_cos(
self, local_path: str, alarm_id: str, device_id: str
) -> Optional[str]:

View File

@@ -174,6 +174,21 @@ class ResultReporter:
self._logger.error(f"写入 Redis 队列失败: {e}")
return False
def report_alarm_resolve(self, resolve_data: dict) -> bool:
"""上报告警结束事件(写入 Redis 队列)"""
if self._redis is None:
self._logger.error("Redis 未连接,无法写入 resolve 队列")
return False
try:
resolve_data["_type"] = "resolve" # 标记类型worker 据此分流
resolve_json = json.dumps(resolve_data, ensure_ascii=False)
self._redis.lpush(REDIS_KEY_ALARM_PENDING, resolve_json)
self._logger.info(f"告警结束事件已入队: alarm_id={resolve_data.get('alarm_id')}")
return True
except Exception as e:
self._logger.error(f"写入 resolve 队列失败: {e}")
return False
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
stats = self._performance_stats.copy()

31
main.py
View File

@@ -514,6 +514,23 @@ class EdgeInferenceService:
for alert in alerts:
alert_type = alert.get("alert_type", "detection")
# resolve 事件:更新已有告警,不创建新告警
if alert_type == "alarm_resolve":
resolve_data = {
"alarm_id": alert.get("resolve_alarm_id"),
"duration_ms": alert.get("duration_ms"),
"last_frame_time": alert.get("last_frame_time"),
"resolve_type": alert.get("resolve_type"),
}
if self._reporter:
self._reporter.report_alarm_resolve(resolve_data)
self._logger.info(
f"离岗告警结束: alarm_id={resolve_data['alarm_id']}, "
f"duration_ms={resolve_data['duration_ms']}, "
f"reason={resolve_data['resolve_type']}"
)
continue
# 摄像头级别去重:同一摄像头+告警类型在冷却期内只上报一次
dedup_key = f"{camera_id}_{alert_type}"
now = frame.timestamp
@@ -530,6 +547,13 @@ class EdgeInferenceService:
self._camera_alert_cooldown[dedup_key] = now
self._performance_stats["total_alerts_generated"] += 1
# 获取算法的离岗开始时间
leave_start_time = None
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, '_leave_start_time') and algo._leave_start_time:
leave_start_time = algo._leave_start_time.isoformat()
from core.result_reporter import AlarmInfo, generate_alarm_id
alarm_info = AlarmInfo(
alarm_id=generate_alarm_id(self._settings.mqtt.device_id),
@@ -548,10 +572,17 @@ class EdgeInferenceService:
"bind_id": bind.bind_id,
"message": alert.get("message", ""),
"edge_node_id": self._settings.mqtt.device_id,
"first_frame_time": leave_start_time,
},
)
self._reporter.report_alarm(alarm_info, screenshot=frame.image)
# 回填 alarm_id 到算法实例(用于后续 resolve 追踪)
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
self._logger.info(
f"告警已生成: type={alert_type}, "
f"camera={camera_id}, roi={roi_id}, "