功能:添加心跳守护线程,每30秒向 WVP 上报设备状态

- 新增 _start_heartbeat() 守护线程
- 每30秒 POST 到 WVP /api/ai/device/heartbeat
- 上报 uptime、帧数、告警数、活跃流数、配置版本
- 使用 stop_event.wait(30) 优雅退出

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-17 17:47:12 +08:00
parent ea992c6daa
commit 0d88ed7fbb

50
main.py
View File

@@ -54,6 +54,7 @@ class EdgeInferenceService:
self._debug_reload_thread: Optional[threading.Thread] = None
self._debug_http_server = None
self._debug_http_thread: Optional[threading.Thread] = None
self._heartbeat_thread: Optional[threading.Thread] = None
self._processing_threads: Dict[str, threading.Thread] = {}
self._stop_event = threading.Event()
@@ -284,6 +285,54 @@ class EdgeInferenceService:
)
self._debug_http_thread.start()
def _start_heartbeat(self):
"""启动心跳守护线程,每 30 秒向 WVP 上报设备状态"""
def worker():
import requests
base_url = self._settings.alarm_upload.cloud_api_url.rstrip("/")
url = f"{base_url}/api/ai/device/heartbeat"
device_id = self._settings.mqtt.device_id
self._logger.info(f"[心跳] 守护线程已启动, 目标: {url}, device_id={device_id}")
while not self._stop_event.is_set():
try:
start_time = self._performance_stats.get("start_time")
uptime = (datetime.now() - start_time).total_seconds() if start_time else 0
stream_count = len(self._stream_manager._streams) if self._stream_manager else 0
config_version = self._config_manager.config_version if self._config_manager else None
payload = {
"device_id": device_id,
"status": {
"uptime_seconds": int(uptime),
"frames_processed": self._performance_stats.get("total_frames_processed", 0),
"alerts_generated": self._performance_stats.get("total_alerts_generated", 0),
"stream_count": stream_count,
"config_version": config_version,
"stream_stats": {
"active_streams": stream_count,
},
},
}
resp = requests.post(url, json=payload, timeout=10)
if resp.status_code == 200:
self._logger.debug(f"[心跳] 上报成功: uptime={int(uptime)}s, streams={stream_count}")
else:
self._logger.warning(f"[心跳] 上报失败: HTTP {resp.status_code}")
except Exception as e:
self._logger.warning(f"[心跳] 上报异常: {e}")
self._stop_event.wait(30)
self._heartbeat_thread = threading.Thread(
target=worker,
name="HeartbeatWorker",
daemon=True,
)
self._heartbeat_thread.start()
def initialize(self):
"""初始化所有组件"""
self._logger.info("=" * 50)
@@ -301,6 +350,7 @@ class EdgeInferenceService:
self._init_screenshot_handler()
self._start_debug_reload_watcher()
self._start_debug_http_server()
self._start_heartbeat()
self._performance_stats["start_time"] = datetime.now()