From 5dd9dc15d55a01f42891f507b4be0619c96dfc4e Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Tue, 31 Mar 2026 14:35:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9A=E8=A7=A3=E8=80=A6?= =?UTF-8?q?=E5=B8=A7=E8=B0=83=E5=BA=A6=E4=B8=8E=E6=8E=A8=E7=90=86=E9=98=9F?= =?UTF-8?q?=E5=88=97=EF=BC=8C=E6=8F=90=E5=8D=87=E5=8D=95=E5=8D=A1=E5=AE=9E?= =?UTF-8?q?=E6=97=B6=E6=8E=A8=E7=90=86=E7=A8=B3=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/tensorrt_engine.py | 2 +- core/video_stream.py | 21 ++-- docs/edge-inference-optimization.md | 172 ++++++++++++++++++++++++++++ main.py | 114 +++++++++++++----- 4 files changed, 270 insertions(+), 39 deletions(-) create mode 100644 docs/edge-inference-optimization.md diff --git a/core/tensorrt_engine.py b/core/tensorrt_engine.py index 15484ab..0484037 100644 --- a/core/tensorrt_engine.py +++ b/core/tensorrt_engine.py @@ -116,7 +116,7 @@ class TensorRTEngine: if self._context is not None: self._release_resources() - self._cuda_context = cuda.Device(0).make_context() + self._cuda_context = cuda.Device(self.config.device_id).make_context() self._stream = cuda.Stream() TRT_LOGGER = trt.Logger(trt.Logger.WARNING) diff --git a/core/video_stream.py b/core/video_stream.py index 67e8616..ef6c1b0 100644 --- a/core/video_stream.py +++ b/core/video_stream.py @@ -102,6 +102,7 @@ class RTSPStreamReader: self._logger = get_logger("video_stream") self._lock = threading.Lock() + self._latest_frame: Optional[VideoFrame] = None @property def is_connected(self) -> bool: @@ -221,6 +222,8 @@ class RTSPStreamReader: pass self._frame_buffer.put_nowait(frame_obj) + with self._lock: + self._latest_frame = frame_obj if self._on_frame_callback: self._on_frame_callback(frame_obj) @@ -293,6 +296,9 @@ class RTSPStreamReader: self._frame_buffer.get_nowait() except queue.Empty: break + + with self._lock: + self._latest_frame = None self._logger.info(f"视频流已停止: {self.camera_id}") @@ -305,16 +311,11 @@ class RTSPStreamReader: def get_latest_frame(self, timeout: float = 1.0) -> Optional[VideoFrame]: """获取最新帧(丢弃中间帧)""" - try: - while True: - try: - frame = self._frame_buffer.get_nowait() - if self._frame_buffer.empty(): - return frame - except queue.Empty: - return None - except Exception: - return None + del timeout + with self._lock: + frame = self._latest_frame + self._latest_frame = None + return frame def get_frame_batch(self, max_count: int = 8, timeout: float = 2.0) -> List[VideoFrame]: diff --git a/docs/edge-inference-optimization.md b/docs/edge-inference-optimization.md new file mode 100644 index 0000000..e8b37ef --- /dev/null +++ b/docs/edge-inference-optimization.md @@ -0,0 +1,172 @@ +# Edge 推理结构优化说明 + +## 目标 + +本次优化只调整 `security-ai-edge` 内部实时推理链路,目标是: + +- 保持现有功能不变 +- 保持与其他项目的现有对接方式不变 +- 提升单卡场景下的实时稳定性 +- 降低解码线程被 CPU 预处理拖慢的风险 +- 在高并发下优先处理最新帧,避免旧帧堆积 + +本次未修改以下外部行为: + +- WVP 下发摄像头与 ROI 配置的方式 +- Redis / MQTT / HTTP 对接协议 +- 告警生成、告警去重、截图回调、心跳上报的外部接口 +- TensorRT engine 文件加载方式 + +## 改动概览 + +### 1. 解耦读帧线程与推理预处理 + +优化前: + +- `RTSPStreamReader` 读到帧后,直接通过回调进入 `_process_frame` +- `_process_frame` 内部会做 ROI 遍历、裁剪、预处理、入推理队列 +- 当 ROI 多、CPU 紧张时,读帧线程容易被预处理阻塞 + +优化后: + +- `RTSPStreamReader` 只负责读帧和保存“最新帧” +- 新增中心调度线程 `FrameScheduler` +- 调度线程从每路流提取最新帧,再统一调用 `_process_frame` + +收益: + +- 解码线程不再承担重 CPU 预处理 +- 拉流稳定性更高 +- 更符合实时视频系统“最新帧优先”的处理模型 + +### 2. 每路流改为“最新帧消费” + +优化前: + +- `get_latest_frame()` 通过清空队列获取最后一帧 +- 在高并发下,队列遍历和频繁出队会增加额外开销 + +优化后: + +- 流对象内部维护 `_latest_frame` +- 调度线程直接消费这份最新帧引用 +- 中间帧天然被覆盖,不再积压到后面阶段 + +收益: + +- 获取最新帧成本更低 +- 更适合实时系统而非离线批处理 + +### 3. ROI 预处理结果复用 + +优化前: + +- 同一 ROI 下如果绑定多个算法,会重复执行 `preprocess_single` + +优化后: + +- 先筛出启用的 `bind` +- 对每个 ROI 只执行一次预处理 +- 多个 `bind` 复用同一份裁剪图和缩放信息 + +收益: + +- 降低 CPU crop / resize / normalize 开销 +- ROI 绑定越多,收益越明显 + +### 4. 推理队列增加拥塞保护 + +优化前: + +- ROI 任务持续追加到 `_batch_roi_queue` +- 在输入高于推理吞吐时,可能出现旧任务堆积 + +优化后: + +- 为 `_batch_roi_queue` 增加最大挂起量限制 +- 超限时丢弃最旧 ROI 任务 + +收益: + +- 控制排队长度 +- 避免系统在高压时持续处理过期画面 + +说明: + +- 这是实时系统常见策略,目标是“宁可丢旧帧,不处理过期帧” + +### 5. TensorRT 设备选择修正 + +优化前: + +- `TensorRT` 初始化时固定使用 `cuda.Device(0)` +- 即使配置里指定了 `device_id`,实际也不会生效 + +优化后: + +- 改为使用 `self.config.device_id` + +收益: + +- 保留现有单卡行为 +- 为后续指定 GPU 设备提供正确基础 + +## 当前结构 + +优化后的主链路为: + +1. `RTSPStreamReader` 拉流并保存最新帧 +2. `FrameScheduler` 周期性轮询所有流 +3. `_process_frame` 生成 ROI 推理任务 +4. `_batch_roi_queue` 聚合任务并做拥塞保护 +5. `InferenceWorker` 做小批量 TensorRT 推理 +6. `PostProcessor` + `AlgorithmManager` 输出业务告警 +7. `ResultReporter` / `AlarmUploadWorker` 继续负责上报 + +## 兼容性说明 + +本次优化没有改变这些关键约束: + +- 摄像头仍然按 ROI 配置启动和停用 +- 算法处理入口 `_handle_detections` 不变 +- 现有告警冷却、ROI 去重、resolve 机制不变 +- 现有外部平台无需修改配置或接口 + +因此它属于“内部调度优化”,而不是“系统集成改造”。 + +## 新增运行指标 + +`get_status()` 新增了以下统计项: + +- `dropped_frames` +- `dropped_roi_tasks` +- `inference_batches` +- `scheduler_cycles` + +这些指标用于判断: + +- 是否存在旧帧丢弃 +- 是否出现 ROI 队列拥塞 +- 推理批次是否稳定 +- 调度线程是否持续运行 + +## 仍可继续优化的方向 + +本次改造重点是“在当前架构下提升实时性和稳定性”,后续还可以继续做: + +- 将 OpenCV/FFmpeg CPU 解码切换到 GPU 硬解 +- 将调度参数做成配置项,例如最大帧年龄、调度周期、最大挂起 ROI 数 +- 为不同摄像头增加优先级调度 +- 为高频 ROI 增加更细粒度缓存和复用 +- 为推理链路补充阶段耗时统计:调度、预处理、推理、后处理 + +## 适用场景 + +这版结构更适合: + +- 单卡部署 +- 多路 RTSP 并发 +- 小 batch 实时推理 +- 需要低延迟而非离线全量处理的场景 + +如果后续要切到 GPU 解码,当前这版调度结构也更容易继续演进,因为“读帧”和“处理帧”已经拆开了。 diff --git a/main.py b/main.py index 75ac0f3..93cabad 100644 --- a/main.py +++ b/main.py @@ -59,6 +59,7 @@ class EdgeInferenceService: self._debug_http_server = None self._debug_http_thread: Optional[threading.Thread] = None self._heartbeat_thread: Optional[threading.Thread] = None + self._scheduler_thread: Optional[threading.Thread] = None self._processing_threads: Dict[str, threading.Thread] = {} self._stop_event = threading.Event() @@ -68,6 +69,10 @@ class EdgeInferenceService: "total_frames_processed": 0, "total_alerts_generated": 0, "uptime_seconds": 0, + "dropped_frames": 0, + "dropped_roi_tasks": 0, + "scheduler_cycles": 0, + "inference_batches": 0, } self._batch_roi_queue: List[tuple] = [] @@ -76,6 +81,9 @@ class EdgeInferenceService: self._inference_thread: Optional[threading.Thread] = None self._max_batch_size = 8 self._batch_timeout_sec = 0.05 # 50ms 攒批窗口 + self._scheduler_interval_sec = 0.01 + self._max_frame_age_sec = 0.5 + self._max_pending_roi_items = self._max_batch_size * 32 # 摄像头级别告警去重:同一摄像头+告警类型在冷却期内只上报一次 self._camera_alert_cooldown: Dict[str, datetime] = {} @@ -459,7 +467,6 @@ class EdgeInferenceService: camera_id=camera.camera_id, rtsp_url=camera.rtsp_url, target_fps=self._settings.video_stream.default_fps, - on_frame_callback=self._create_frame_callback(camera.camera_id) ) self._logger.info(f"已添加摄像头: {camera.camera_id}") success_count += 1 @@ -525,7 +532,6 @@ class EdgeInferenceService: camera_id=camera.camera_id, rtsp_url=camera.rtsp_url, target_fps=self._settings.video_stream.default_fps, - on_frame_callback=self._create_frame_callback(camera.camera_id) ) # 立即启动新添加的流 self._stream_manager._streams[camera.camera_id].start() @@ -586,12 +592,24 @@ class EdgeInferenceService: except Exception as e: self._logger.error(f"清理摄像头流失败: {e}") - def _create_frame_callback(self, camera_id: str): - """创建帧处理回调""" - def callback(frame): - self._process_frame(camera_id, frame) - return callback - + def _enqueue_roi_items(self, roi_items: List[tuple]): + """向推理队列追加 ROI 任务,并在拥塞时丢弃最旧任务。""" + if not roi_items: + return + + with self._batch_lock: + overflow = max( + 0, + len(self._batch_roi_queue) + len(roi_items) - self._max_pending_roi_items + ) + if overflow > 0: + del self._batch_roi_queue[:overflow] + self._performance_stats["dropped_roi_tasks"] += overflow + + self._batch_roi_queue.extend(roi_items) + + self._batch_event.set() + def _process_frame(self, camera_id: str, frame: VideoFrame): """处理视频帧 - 批量处理多 ROI""" try: @@ -607,32 +625,53 @@ class EdgeInferenceService: continue if not roi.bindings: continue - - for bind in roi.bindings: - if not bind.enabled: - continue - - try: - cropped, scale_info = self._preprocessor.preprocess_single( - frame.image, roi - ) - roi_items.append((camera_id, roi, bind, frame, cropped, scale_info)) - except Exception as e: - self._logger.error(f"预处理 ROI 失败 {roi.roi_id}: {e}") + + enabled_binds = [bind for bind in roi.bindings if bind.enabled] + if not enabled_binds: + continue + + try: + cropped, scale_info = self._preprocessor.preprocess_single( + frame.image, roi + ) + except Exception as e: + self._logger.error(f"预处理 ROI 失败 {roi.roi_id}: {e}") + continue + + for bind in enabled_binds: + roi_items.append((camera_id, roi, bind, frame, cropped, scale_info)) if not roi_items: return - with self._batch_lock: - self._batch_roi_queue.extend(roi_items) - - # 通知推理线程有新数据 - self._batch_event.set() - + self._enqueue_roi_items(roi_items) self._performance_stats["total_frames_processed"] += 1 except Exception as e: self._logger.error(f"处理帧失败 {camera_id}: {e}") + + def _scheduler_worker(self): + """中心调度线程:只消费各路最新帧,避免解码线程被预处理阻塞。""" + while not self._stop_event.is_set(): + had_frame = False + + for stream in self._stream_manager.get_all_streams(): + frame = stream.get_latest_frame(timeout=0.0) + if frame is None: + continue + + frame_age = (datetime.now() - frame.timestamp).total_seconds() + if frame_age > self._max_frame_age_sec: + self._performance_stats["dropped_frames"] += 1 + continue + + had_frame = True + self._process_frame(stream.camera_id, frame) + + self._performance_stats["scheduler_cycles"] += 1 + + if not had_frame: + self._stop_event.wait(self._scheduler_interval_sec) def _batch_process_rois(self): """批量处理 ROI - 真正的 batch 推理(按 max_batch_size 分块)""" @@ -661,6 +700,11 @@ class EdgeInferenceService: # 一次性推理整个 batch outputs, inference_time_ms = engine.infer(batch_data) + self._performance_stats["inference_batches"] += 1 + self._logger.log_inference_latency( + inference_time_ms, + batch_size=len(chunk), + ) # 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别) import numpy as np @@ -938,6 +982,15 @@ class EdgeInferenceService: self._stop_event.clear() self._load_cameras() + self._stream_manager.start_all() + + self._scheduler_thread = threading.Thread( + target=self._scheduler_worker, + name="FrameScheduler", + daemon=True + ) + self._scheduler_thread.start() + self._logger.info("帧调度线程已启动") # 启动独立推理线程(生产者-消费者模式) self._inference_thread = threading.Thread( @@ -948,8 +1001,6 @@ class EdgeInferenceService: self._inference_thread.start() self._logger.info("推理线程已启动") - self._stream_manager.start_all() - self._logger.info("Edge_Inference_Service 已启动") self._register_signal_handlers() @@ -979,6 +1030,9 @@ class EdgeInferenceService: self._stop_event.set() self._batch_event.set() # 唤醒推理线程以退出 + if self._scheduler_thread and self._scheduler_thread.is_alive(): + self._scheduler_thread.join(timeout=5) + if self._inference_thread and self._inference_thread.is_alive(): self._inference_thread.join(timeout=5) @@ -1029,6 +1083,10 @@ class EdgeInferenceService: "uptime_seconds": self._performance_stats["uptime_seconds"], "total_frames_processed": self._performance_stats["total_frames_processed"], "total_alerts_generated": self._performance_stats["total_alerts_generated"], + "dropped_frames": self._performance_stats["dropped_frames"], + "dropped_roi_tasks": self._performance_stats["dropped_roi_tasks"], + "inference_batches": self._performance_stats["inference_batches"], + "scheduler_cycles": self._performance_stats["scheduler_cycles"], "stream_manager": ( self._stream_manager.get_statistics() if self._stream_manager else {}