diff --git a/algorithms.py b/algorithms.py index eff814f..b1e5da9 100644 --- a/algorithms.py +++ b/algorithms.py @@ -47,6 +47,9 @@ class LeavePostAlgorithm: self.alarm_sent: bool = False self.last_person_time: Optional[datetime] = None + self._last_alarm_id: Optional[str] = None # 最近一次告警ID + self._leave_start_time: Optional[datetime] = None # 离岗开始时间(LEAVING 状态的开始时间) + def _is_in_working_hours(self, dt: Optional[datetime] = None) -> bool: if not self.working_hours: return True @@ -123,15 +126,28 @@ class LeavePostAlgorithm: alerts = [] if not in_work: + if self.state == self.STATE_OFF_DUTY and self._last_alarm_id and self._leave_start_time: + duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.isoformat(), + "resolve_type": "non_work_time", + }) + self._last_alarm_id = None + self._leave_start_time = None self.state = self.STATE_NON_WORK_TIME self.detection_history.clear() self.alarm_sent = False - return [] + return alerts if self.state == self.STATE_NON_WORK_TIME: self.state = self.STATE_WAITING + self.state_start_time = None self.detection_history.clear() self.alarm_sent = False + self.alert_cooldowns.clear() # 新工作时段,清除冷却记录 roi_has_person = False for det in tracks: @@ -162,6 +178,8 @@ class LeavePostAlgorithm: # 持续在岗达到确认时长,正式确认上岗 self.state = self.STATE_ON_DUTY self.state_start_time = current_time + # 确认在岗后清除冷却记录,允许新一轮离岗检测告警 + self.alert_cooldowns.clear() elif self.state == self.STATE_ON_DUTY: self.detection_history.append((current_time, roi_has_person)) @@ -178,6 +196,7 @@ class LeavePostAlgorithm: self.state_start_time = current_time elif elapsed >= self.confirm_leave_sec: # 确认离岗后直接触发告警,不再进入 OFF_DUTY 二次等待 + leaving_start_time = self.state_start_time # 保存 LEAVING 状态开始时间(人员离开时间) self.state = self.STATE_OFF_DUTY self.state_start_time = current_time @@ -196,30 +215,37 @@ class LeavePostAlgorithm: }) self.alert_cooldowns[cooldown_key] = now + # 保存告警追踪信息(alarm_id 由 main.py 通过 set_last_alarm_id() 回填) + self._last_alarm_id = None + self._leave_start_time = leaving_start_time # LEAVING 状态开始时间 = 人员离开时间 + elif self.state == self.STATE_OFF_DUTY: - # OFF_DUTY 状态:等待人员回岗或冷却后可再次告警 + # OFF_DUTY 状态:只等待人员回岗,不再重复告警 + # 必须经过 CONFIRMING 重新确认在岗后,才允许新一轮离岗检测 if roi_has_person: - self.state = self.STATE_ON_DUTY - self.state_start_time = current_time - else: - elapsed = (current_time - self.state_start_time).total_seconds() - cooldown_key = f"{camera_id}_{roi_id}" - now = datetime.now() - if cooldown_key in self.alert_cooldowns and (now - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: - bbox = self._get_latest_bbox(tracks, roi_id) - elapsed_minutes = int(elapsed / 60) + # 生成 resolve 事件(人员回岗) + if self._last_alarm_id and self._leave_start_time: + duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000) alerts.append({ - "track_id": roi_id, - "camera_id": camera_id, - "bbox": bbox, - "duration_minutes": elapsed_minutes, - "alert_type": "leave_post", - "message": f"持续离岗 {elapsed_minutes} 分钟", + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.isoformat(), + "resolve_type": "person_returned", }) - self.alert_cooldowns[cooldown_key] = now + self._last_alarm_id = None + self._leave_start_time = None + self.state = self.STATE_CONFIRMING + self.state_start_time = current_time + self.detection_history.clear() + self.detection_history.append((current_time, True)) return alerts + def set_last_alarm_id(self, alarm_id: str): + """由 main.py 在告警生成后回填 alarm_id""" + self._last_alarm_id = alarm_id + def reset(self): self.state = self.STATE_WAITING self.state_start_time = None @@ -227,6 +253,8 @@ class LeavePostAlgorithm: self.alarm_sent = False self.last_person_time = None self.alert_cooldowns.clear() + self._last_alarm_id = None + self._leave_start_time = None def get_state(self, roi_id: str) -> Dict[str, Any]: return { diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index d9272f4..66c4016 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -173,6 +173,11 @@ class AlarmUploadWorker: self._logger.error(f"告警 JSON 解析失败: {e}") return + # 如果是 resolve 事件,走单独的处理逻辑 + if alarm_data.get("_type") == "resolve": + self._process_resolve(alarm_data) + return + alarm_id = alarm_data.get("alarm_id", "unknown") retry_count = alarm_data.get("_retry_count", 0) @@ -241,6 +246,36 @@ class AlarmUploadWorker: # HTTP 上报失败,进入重试 self._handle_retry(alarm_json, "HTTP 上报失败") + def _process_resolve(self, resolve_data: dict): + """处理告警结束事件 - HTTP POST 到云端""" + upload_cfg = self._settings.alarm_upload + base_url = upload_cfg.cloud_api_url.rstrip("/") + url = f"{base_url}/admin-api/aiot/alarm/edge/resolve" + + headers = {"Content-Type": "application/json"} + if upload_cfg.edge_token: + headers["Authorization"] = f"Bearer {upload_cfg.edge_token}" + + payload = { + "alarm_id": resolve_data.get("alarm_id"), + "duration_ms": resolve_data.get("duration_ms"), + "last_frame_time": resolve_data.get("last_frame_time"), + "resolve_type": resolve_data.get("resolve_type"), + } + + try: + response = requests.post(url, json=payload, headers=headers, timeout=10) + if response.status_code == 200: + body = response.json() + if body.get("code") == 0: + self._logger.info(f"告警结束上报成功: {resolve_data.get('alarm_id')}") + else: + self._logger.warning(f"告警结束上报业务错误: {body}") + else: + self._logger.warning(f"告警结束上报失败: status={response.status_code}") + except Exception as e: + self._logger.warning(f"告警结束上报异常: {e}") + def _upload_snapshot_to_cos( self, local_path: str, alarm_id: str, device_id: str ) -> Optional[str]: diff --git a/core/result_reporter.py b/core/result_reporter.py index b622ec6..53a3e53 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -174,6 +174,21 @@ class ResultReporter: self._logger.error(f"写入 Redis 队列失败: {e}") return False + def report_alarm_resolve(self, resolve_data: dict) -> bool: + """上报告警结束事件(写入 Redis 队列)""" + if self._redis is None: + self._logger.error("Redis 未连接,无法写入 resolve 队列") + return False + try: + resolve_data["_type"] = "resolve" # 标记类型,worker 据此分流 + resolve_json = json.dumps(resolve_data, ensure_ascii=False) + self._redis.lpush(REDIS_KEY_ALARM_PENDING, resolve_json) + self._logger.info(f"告警结束事件已入队: alarm_id={resolve_data.get('alarm_id')}") + return True + except Exception as e: + self._logger.error(f"写入 resolve 队列失败: {e}") + return False + def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" stats = self._performance_stats.copy() diff --git a/main.py b/main.py index 01e8e2d..4ce769d 100644 --- a/main.py +++ b/main.py @@ -514,6 +514,23 @@ class EdgeInferenceService: for alert in alerts: alert_type = alert.get("alert_type", "detection") + # resolve 事件:更新已有告警,不创建新告警 + if alert_type == "alarm_resolve": + resolve_data = { + "alarm_id": alert.get("resolve_alarm_id"), + "duration_ms": alert.get("duration_ms"), + "last_frame_time": alert.get("last_frame_time"), + "resolve_type": alert.get("resolve_type"), + } + if self._reporter: + self._reporter.report_alarm_resolve(resolve_data) + self._logger.info( + f"离岗告警结束: alarm_id={resolve_data['alarm_id']}, " + f"duration_ms={resolve_data['duration_ms']}, " + f"reason={resolve_data['resolve_type']}" + ) + continue + # 摄像头级别去重:同一摄像头+告警类型在冷却期内只上报一次 dedup_key = f"{camera_id}_{alert_type}" now = frame.timestamp @@ -530,6 +547,13 @@ class EdgeInferenceService: self._camera_alert_cooldown[dedup_key] = now self._performance_stats["total_alerts_generated"] += 1 + # 获取算法的离岗开始时间 + leave_start_time = None + if alert_type == "leave_post": + algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post") + if algo and hasattr(algo, '_leave_start_time') and algo._leave_start_time: + leave_start_time = algo._leave_start_time.isoformat() + from core.result_reporter import AlarmInfo, generate_alarm_id alarm_info = AlarmInfo( alarm_id=generate_alarm_id(self._settings.mqtt.device_id), @@ -548,10 +572,17 @@ class EdgeInferenceService: "bind_id": bind.bind_id, "message": alert.get("message", ""), "edge_node_id": self._settings.mqtt.device_id, + "first_frame_time": leave_start_time, }, ) self._reporter.report_alarm(alarm_info, screenshot=frame.image) + # 回填 alarm_id 到算法实例(用于后续 resolve 追踪) + if alert_type == "leave_post": + algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post") + if algo and hasattr(algo, 'set_last_alarm_id'): + algo.set_last_alarm_id(alarm_info.alarm_id) + self._logger.info( f"告警已生成: type={alert_type}, " f"camera={camera_id}, roi={roi_id}, "