优化:解耦帧调度与推理队列,提升单卡实时推理稳定性

This commit is contained in:
2026-03-31 14:35:52 +08:00
parent bb47b94c6e
commit 5dd9dc15d5
4 changed files with 270 additions and 39 deletions

114
main.py
View File

@@ -59,6 +59,7 @@ class EdgeInferenceService:
self._debug_http_server = None
self._debug_http_thread: Optional[threading.Thread] = None
self._heartbeat_thread: Optional[threading.Thread] = None
self._scheduler_thread: Optional[threading.Thread] = None
self._processing_threads: Dict[str, threading.Thread] = {}
self._stop_event = threading.Event()
@@ -68,6 +69,10 @@ class EdgeInferenceService:
"total_frames_processed": 0,
"total_alerts_generated": 0,
"uptime_seconds": 0,
"dropped_frames": 0,
"dropped_roi_tasks": 0,
"scheduler_cycles": 0,
"inference_batches": 0,
}
self._batch_roi_queue: List[tuple] = []
@@ -76,6 +81,9 @@ class EdgeInferenceService:
self._inference_thread: Optional[threading.Thread] = None
self._max_batch_size = 8
self._batch_timeout_sec = 0.05 # 50ms 攒批窗口
self._scheduler_interval_sec = 0.01
self._max_frame_age_sec = 0.5
self._max_pending_roi_items = self._max_batch_size * 32
# 摄像头级别告警去重:同一摄像头+告警类型在冷却期内只上报一次
self._camera_alert_cooldown: Dict[str, datetime] = {}
@@ -459,7 +467,6 @@ class EdgeInferenceService:
camera_id=camera.camera_id,
rtsp_url=camera.rtsp_url,
target_fps=self._settings.video_stream.default_fps,
on_frame_callback=self._create_frame_callback(camera.camera_id)
)
self._logger.info(f"已添加摄像头: {camera.camera_id}")
success_count += 1
@@ -525,7 +532,6 @@ class EdgeInferenceService:
camera_id=camera.camera_id,
rtsp_url=camera.rtsp_url,
target_fps=self._settings.video_stream.default_fps,
on_frame_callback=self._create_frame_callback(camera.camera_id)
)
# 立即启动新添加的流
self._stream_manager._streams[camera.camera_id].start()
@@ -586,12 +592,24 @@ class EdgeInferenceService:
except Exception as e:
self._logger.error(f"清理摄像头流失败: {e}")
def _create_frame_callback(self, camera_id: str):
"""创建帧处理回调"""
def callback(frame):
self._process_frame(camera_id, frame)
return callback
def _enqueue_roi_items(self, roi_items: List[tuple]):
"""向推理队列追加 ROI 任务,并在拥塞时丢弃最旧任务。"""
if not roi_items:
return
with self._batch_lock:
overflow = max(
0,
len(self._batch_roi_queue) + len(roi_items) - self._max_pending_roi_items
)
if overflow > 0:
del self._batch_roi_queue[:overflow]
self._performance_stats["dropped_roi_tasks"] += overflow
self._batch_roi_queue.extend(roi_items)
self._batch_event.set()
def _process_frame(self, camera_id: str, frame: VideoFrame):
"""处理视频帧 - 批量处理多 ROI"""
try:
@@ -607,32 +625,53 @@ class EdgeInferenceService:
continue
if not roi.bindings:
continue
for bind in roi.bindings:
if not bind.enabled:
continue
try:
cropped, scale_info = self._preprocessor.preprocess_single(
frame.image, roi
)
roi_items.append((camera_id, roi, bind, frame, cropped, scale_info))
except Exception as e:
self._logger.error(f"预处理 ROI 失败 {roi.roi_id}: {e}")
enabled_binds = [bind for bind in roi.bindings if bind.enabled]
if not enabled_binds:
continue
try:
cropped, scale_info = self._preprocessor.preprocess_single(
frame.image, roi
)
except Exception as e:
self._logger.error(f"预处理 ROI 失败 {roi.roi_id}: {e}")
continue
for bind in enabled_binds:
roi_items.append((camera_id, roi, bind, frame, cropped, scale_info))
if not roi_items:
return
with self._batch_lock:
self._batch_roi_queue.extend(roi_items)
# 通知推理线程有新数据
self._batch_event.set()
self._enqueue_roi_items(roi_items)
self._performance_stats["total_frames_processed"] += 1
except Exception as e:
self._logger.error(f"处理帧失败 {camera_id}: {e}")
def _scheduler_worker(self):
"""中心调度线程:只消费各路最新帧,避免解码线程被预处理阻塞。"""
while not self._stop_event.is_set():
had_frame = False
for stream in self._stream_manager.get_all_streams():
frame = stream.get_latest_frame(timeout=0.0)
if frame is None:
continue
frame_age = (datetime.now() - frame.timestamp).total_seconds()
if frame_age > self._max_frame_age_sec:
self._performance_stats["dropped_frames"] += 1
continue
had_frame = True
self._process_frame(stream.camera_id, frame)
self._performance_stats["scheduler_cycles"] += 1
if not had_frame:
self._stop_event.wait(self._scheduler_interval_sec)
def _batch_process_rois(self):
"""批量处理 ROI - 真正的 batch 推理(按 max_batch_size 分块)"""
@@ -661,6 +700,11 @@ class EdgeInferenceService:
# 一次性推理整个 batch
outputs, inference_time_ms = engine.infer(batch_data)
self._performance_stats["inference_batches"] += 1
self._logger.log_inference_latency(
inference_time_ms,
batch_size=len(chunk),
)
# 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别)
import numpy as np
@@ -938,6 +982,15 @@ class EdgeInferenceService:
self._stop_event.clear()
self._load_cameras()
self._stream_manager.start_all()
self._scheduler_thread = threading.Thread(
target=self._scheduler_worker,
name="FrameScheduler",
daemon=True
)
self._scheduler_thread.start()
self._logger.info("帧调度线程已启动")
# 启动独立推理线程(生产者-消费者模式)
self._inference_thread = threading.Thread(
@@ -948,8 +1001,6 @@ class EdgeInferenceService:
self._inference_thread.start()
self._logger.info("推理线程已启动")
self._stream_manager.start_all()
self._logger.info("Edge_Inference_Service 已启动")
self._register_signal_handlers()
@@ -979,6 +1030,9 @@ class EdgeInferenceService:
self._stop_event.set()
self._batch_event.set() # 唤醒推理线程以退出
if self._scheduler_thread and self._scheduler_thread.is_alive():
self._scheduler_thread.join(timeout=5)
if self._inference_thread and self._inference_thread.is_alive():
self._inference_thread.join(timeout=5)
@@ -1029,6 +1083,10 @@ class EdgeInferenceService:
"uptime_seconds": self._performance_stats["uptime_seconds"],
"total_frames_processed": self._performance_stats["total_frames_processed"],
"total_alerts_generated": self._performance_stats["total_alerts_generated"],
"dropped_frames": self._performance_stats["dropped_frames"],
"dropped_roi_tasks": self._performance_stats["dropped_roi_tasks"],
"inference_batches": self._performance_stats["inference_batches"],
"scheduler_cycles": self._performance_stats["scheduler_cycles"],
"stream_manager": (
self._stream_manager.get_statistics()
if self._stream_manager else {}