From 5a0265de52079bcaf2334e4cf6329ca836188580 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Tue, 7 Apr 2026 14:05:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9AP0+P1=20=E7=94=9F?= =?UTF-8?q?=E4=BA=A7=E7=A8=B3=E5=AE=9A=E6=80=A7=E5=92=8C=E6=80=A7=E8=83=BD?= =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=886=E9=A1=B9=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0 稳定性修复: - 告警去重字典添加惰性清理机制,防止长时间运行内存溢出 - Redis 连接断开时显式 close() 后再置 None,防止文件描述符泄漏 - 截图消息 ACK 移至成功路径,失败消息留在 pending list 自动重试 P1 性能优化: - GPU NMS 添加 torch.no_grad() + 显式释放临时张量,减少显存碎片 - 截图存储改为 Redis 原始 bytes,去掉 Base64 编解码开销(兼容旧格式) - ROI 配置查询 N+1 改为 get_all_bindings() 单次 JOIN 查询 Co-Authored-By: Claude Opus 4.6 (1M context) --- config/database.py | 31 +++ core/alarm_upload_worker.py | 51 ++++- core/config_sync.py | 25 +-- core/postprocessor.py | 34 ++-- core/result_reporter.py | 32 +++- core/screenshot_handler.py | 50 ++++- docs/p0p1_review_report.md | 372 ++++++++++++++++++++++++++++++++++++ main.py | 39 +++- 8 files changed, 593 insertions(+), 41 deletions(-) create mode 100644 docs/p0p1_review_report.md diff --git a/config/database.py b/config/database.py index 912c6bc..03afb25 100644 --- a/config/database.py +++ b/config/database.py @@ -884,6 +884,37 @@ class SQLiteManager: except Exception as e: logger.error(f"获取摄像头算法绑定失败: {e}") return [] + + def get_all_bindings(self) -> List[Dict[str, Any]]: + """获取所有启用的算法绑定(一次查询,避免 N+1)""" + try: + cursor = self._conn.cursor() + cursor.execute(""" + SELECT b.bind_id, b.roi_id, b.algo_code, b.params, b.priority, + b.enabled, b.created_at, b.updated_at, + a.algo_name, a.target_class + FROM roi_algo_bind b + LEFT JOIN algorithm_registry a ON b.algo_code = a.algo_code + WHERE b.enabled = 1 + ORDER BY b.priority DESC + """) + results = [] + for row in cursor.fetchall(): + result = dict(zip( + ['bind_id', 'roi_id', 'algo_code', 'params', 'priority', + 'enabled', 'created_at', 'updated_at', 'algo_name', 'target_class'], + row + )) + if result.get('params') and isinstance(result['params'], str): + try: + result['params'] = json.loads(result['params']) + except (json.JSONDecodeError, TypeError): + pass + results.append(result) + return results + except Exception as e: + logger.error(f"获取所有算法绑定失败: {e}") + return [] def delete_roi_algo_bind(self, bind_id: str) -> bool: """删除ROI算法绑定""" diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index 0af1bd6..436a554 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -47,6 +47,7 @@ class AlarmUploadWorker: self._logger = logging.getLogger("alarm_upload_worker") self._redis: Optional[redis.Redis] = None + self._redis_binary: Optional[redis.Redis] = None # 用于读取截图 bytes self._cos_client = None # 懒初始化 self._thread: Optional[threading.Thread] = None @@ -80,6 +81,16 @@ class AlarmUploadWorker: ) self._redis.ping() self._logger.info(f"Worker Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}") + + # 二进制 Redis 连接(用于读取截图 bytes,不做 decode) + self._redis_binary = redis.Redis( + host=redis_cfg.host, + port=redis_cfg.port, + db=redis_cfg.db, + password=redis_cfg.password, + decode_responses=False, + socket_connect_timeout=5, + ) except Exception as e: self._logger.error(f"Worker Redis 连接失败: {e}") return @@ -136,6 +147,12 @@ class AlarmUploadWorker: except Exception: pass + if self._redis_binary: + try: + self._redis_binary.close() + except Exception: + pass + self._logger.info("AlarmUploadWorker 已停止") def _worker_loop(self): @@ -184,21 +201,43 @@ class AlarmUploadWorker: self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})") - # Step 1: 上传截图到 COS(从 base64 解码后直接上传字节流) + # Step 1: 上传截图到 COS + snapshot_key = (alarm_data.get("ext_data") or {}).get("_snapshot_key") snapshot_b64 = alarm_data.get("snapshot_b64") object_key = None - if snapshot_b64: + if snapshot_key: + # 新格式:从独立 Redis key 获取原始 bytes + try: + image_bytes = self._redis_binary.get(snapshot_key) if self._redis_binary else None + if image_bytes is None: + self._logger.warning(f"截图 key 已过期: {snapshot_key}, 无截图继续上报") + else: + object_key = self._upload_snapshot_to_cos( + image_bytes, alarm_id, alarm_data.get("device_id", "unknown") + ) + if object_key is None: + self._handle_retry(alarm_json, "COS 上传失败") + return + # 上传成功后删除临时 key + try: + if self._redis_binary: + self._redis_binary.delete(snapshot_key) + except Exception: + pass + except Exception as e: + self._logger.error(f"截图获取/上传失败: {e}") + self._handle_retry(alarm_json, f"截图处理失败: {e}") + return + elif snapshot_b64: + # 兼容旧格式 (Base64) try: import base64 image_bytes = base64.b64decode(snapshot_b64) object_key = self._upload_snapshot_to_cos( - image_bytes, - alarm_id, - alarm_data.get("device_id", "unknown"), + image_bytes, alarm_id, alarm_data.get("device_id", "unknown") ) if object_key is None: - # COS 上传失败,进入重试 self._handle_retry(alarm_json, "COS 上传失败") return except Exception as e: diff --git a/core/config_sync.py b/core/config_sync.py index 8db730e..6083196 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -215,6 +215,15 @@ class ConfigSyncManager: logger.error(f"本地 Redis 连接失败: {e}") self._local_redis = None + def _safe_close_cloud_redis(self): + """安全关闭云端 Redis 连接""" + if self._cloud_redis is not None: + try: + self._cloud_redis.close() + except Exception: + pass + self._cloud_redis = None + def _init_cloud_redis(self): """初始化云端 Redis 连接""" try: @@ -238,7 +247,7 @@ class ConfigSyncManager: except Exception as e: logger.warning(f"云端 Redis 连接失败(将使用本地缓存运行): {e}") - self._cloud_redis = None + self._safe_close_cloud_redis() def _init_database(self): """初始化 SQLite 数据库连接""" @@ -311,9 +320,7 @@ class ConfigSyncManager: try: cameras = self._db_manager.get_all_camera_configs() rois = self._db_manager.get_all_roi_configs() - binds = [] - for roi in rois: - binds.extend(self._db_manager.get_bindings_by_roi(roi["roi_id"])) + binds = self._db_manager.get_all_bindings() logger.info(f"[EDGE] Loading config from local db ({source})...") logger.info(f"[EDGE] Camera count = {len(cameras)}") logger.info(f"[EDGE] ROI count = {len(rois)}") @@ -378,7 +385,7 @@ class ConfigSyncManager: if self._stop_event.is_set(): return logger.warning(f"云端 Redis 连接断开: {e}, {backoff}s 后重连...") - self._cloud_redis = None + self._safe_close_cloud_redis() self._stop_event.wait(backoff) backoff = min(backoff * 2, max_backoff) @@ -776,10 +783,7 @@ class ConfigSyncManager: bindings_list = self._db_manager.get_bindings_by_camera(camera_id) else: roi_configs = self._db_manager.get_all_roi_configs() - bindings_list = [] - for roi in roi_configs: - bindings = self._db_manager.get_bindings_by_roi(roi['roi_id']) - bindings_list.extend(bindings) + bindings_list = self._db_manager.get_all_bindings() roi_dict = {r['roi_id']: r for r in roi_configs} bindings_dict: Dict[str, list] = {} @@ -857,8 +861,7 @@ class ConfigSyncManager: binds: List[Dict[str, Any]] = [] rois = self._db_manager.get_all_roi_configs() - for roi in rois: - binds.extend(self._db_manager.get_bindings_by_roi(roi["roi_id"])) + binds = self._db_manager.get_all_bindings() return binds def get_algo_bind_from_redis(self, bind_id: str) -> Optional[Dict[str, Any]]: diff --git a/core/postprocessor.py b/core/postprocessor.py index 1c67d01..92590e2 100644 --- a/core/postprocessor.py +++ b/core/postprocessor.py @@ -78,22 +78,24 @@ class NMSProcessor: max_output_size: int ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """GPU 加速 NMS""" - boxes_t = torch.from_numpy(boxes).cuda() - scores_t = torch.from_numpy(scores).cuda() - - keep = torch_nms(boxes_t, scores_t, iou_threshold=self.nms_threshold) - - keep_np = keep.cpu().numpy() - - if len(keep_np) > max_output_size: - top_k = np.argsort(scores[keep_np])[::-1][:max_output_size] - keep_np = keep_np[top_k] - - return ( - keep_np.astype(np.int32), - scores[keep_np], - class_ids[keep_np] if class_ids is not None else np.array([]) - ) + with torch.no_grad(): + boxes_t = torch.from_numpy(boxes).cuda() + scores_t = torch.from_numpy(scores).cuda() + + keep = torch_nms(boxes_t, scores_t, iou_threshold=self.nms_threshold) + + keep_np = keep.cpu().numpy() + del boxes_t, scores_t, keep + + if len(keep_np) > max_output_size: + top_k = np.argsort(scores[keep_np])[::-1][:max_output_size] + keep_np = keep_np[top_k] + + return ( + keep_np.astype(np.int32), + scores[keep_np], + class_ids[keep_np] if class_ids is not None else np.array([]) + ) def _process_cpu( self, diff --git a/core/result_reporter.py b/core/result_reporter.py index 816dd07..9fbe117 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -112,9 +112,20 @@ class ResultReporter: self._logger.info( f"Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}" ) + + # 二进制 Redis 连接(用于存储截图 bytes,不做 decode) + self._redis_binary = redis.Redis( + host=redis_cfg.host, + port=redis_cfg.port, + db=redis_cfg.db, + password=redis_cfg.password, + decode_responses=False, + socket_connect_timeout=5, + ) except Exception as e: self._logger.error(f"Redis 连接失败: {e}") self._redis = None + self._redis_binary = None def report_alarm(self, alarm_info: AlarmInfo, screenshot: Optional[np.ndarray] = None) -> bool: """ @@ -129,13 +140,22 @@ class ResultReporter: """ self._performance_stats["alerts_generated"] += 1 - # 将截图编码为 JPEG base64,直接通过 Redis 传递给 Worker 上传 COS + # 将截图编码为 JPEG,直接存储 bytes 到独立 Redis key,避免 Base64 开销 if screenshot is not None: try: import cv2 - import base64 success, buffer = cv2.imencode('.jpg', screenshot, [cv2.IMWRITE_JPEG_QUALITY, 85]) - if success: + if success and self._redis_binary is not None: + snapshot_key = f"local:alarm:snapshot:{alarm_info.alarm_id}" + # 直接存储 JPEG bytes,避免 Base64 编解码开销 + self._redis_binary.set(snapshot_key, buffer.tobytes(), ex=3600) + alarm_info.snapshot_b64 = None + if alarm_info.ext_data is None: + alarm_info.ext_data = {} + alarm_info.ext_data["_snapshot_key"] = snapshot_key + elif success: + # 降级:无二进制 Redis 连接时使用 Base64 + import base64 alarm_info.snapshot_b64 = base64.b64encode(buffer.tobytes()).decode('ascii') else: self._logger.warning("截图 JPEG 编码失败") @@ -211,6 +231,12 @@ class ResultReporter: except Exception: pass + if hasattr(self, '_redis_binary') and self._redis_binary: + try: + self._redis_binary.close() + except Exception: + pass + self._logger.info("ResultReporter 清理完成") def cleanup(self): diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index 4dc8010..128af0f 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -59,6 +59,7 @@ class ScreenshotHandler: self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() + self._last_pending_check = 0.0 # ==================== 生命周期 ==================== @@ -180,20 +181,26 @@ class ScreenshotHandler: backoff = 5 # 重置退避 + # 每 60 秒检查一次 pending 消息 + import time as _time + if _time.time() - self._last_pending_check > 60: + self._last_pending_check = _time.time() + self._cleanup_pending_messages() + for stream_name, messages in results: for msg_id, fields in messages: try: self._handle_request(fields) - except Exception as e: - logger.error("[截图] 处理请求失败: %s", e) - finally: - # ACK 消息 + # 处理成功才 ACK try: self._cloud_redis.xack( SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id ) except Exception: pass + except Exception as e: + logger.error("[截图] 处理请求失败 (msg_id=%s): %s", msg_id, e) + # 不 ACK,消息留在 pending list 等待重试 except redis.ConnectionError as e: if self._stop_event.is_set(): @@ -409,3 +416,38 @@ class ScreenshotHandler: logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id) except Exception as e: logger.error("[截图] 降级写 Redis 也失败: %s", e) + + # ==================== Pending 消息清理 ==================== + + _MAX_RETRY_COUNT = 3 + _PENDING_IDLE_MS = 30000 # 消息 pending 超过 30 秒才处理 + + def _cleanup_pending_messages(self): + """清理 pending list 中重试次数过多的消息""" + try: + pending = self._cloud_redis.xpending_range( + SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, + min="-", max="+", count=50, + consumername=self._consumer_name + ) + for entry in pending: + msg_id = entry['message_id'] + delivery_count = entry['times_delivered'] + idle_ms = entry['time_since_delivered'] + + if idle_ms < self._PENDING_IDLE_MS: + continue + + if delivery_count > self._MAX_RETRY_COUNT: + logger.warning( + "[截图] 消息超过最大重试次数,丢弃: msg_id=%s, retries=%d", + msg_id, delivery_count + ) + try: + self._cloud_redis.xack( + SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id + ) + except Exception: + pass + except Exception as e: + logger.debug("[截图] 检查 pending list: %s", e) diff --git a/docs/p0p1_review_report.md b/docs/p0p1_review_report.md new file mode 100644 index 0000000..2aaaec5 --- /dev/null +++ b/docs/p0p1_review_report.md @@ -0,0 +1,372 @@ +# P0+P1 修复涉及文件全面审查报告 + +> 审查日期: 2026-04-02 +> 审查范围: main.py, config_sync.py, screenshot_handler.py, tensorrt_engine.py, result_reporter.py, alarm_upload_worker.py, postprocessor.py 及相关依赖 + +--- + +## 1. 功能基线清单 + +### 1.1 main.py - EdgeInferenceService + +| 方法 | 行号 | 行为描述 | +|------|------|----------| +| `__init__` | 42-96 | 初始化两个去重字典: `_camera_alert_cooldown` (摄像头级, Dict[str, datetime]) 和 `_active_alarms` (ROI级, Dict[str, str])。冷却期默认30秒。 | +| `_handle_detections` | 790-954 | 核心告警处理入口。接收检测结果后: (1) 调用算法管理器获取alerts; (2) 对 `alarm_resolve` 类型从 `_active_alarms` 中清除对应记录; (3) ROI级去重: 检查 `_active_alarms[f"{roi_id}_{alert_type}"]` 是否存在; (4) 摄像头级去重: 检查 `_camera_alert_cooldown[f"{camera_id}_{alert_type}"]` 时间间隔; (5) 构建 AlarmInfo 并调用 `report_alarm()`; (6) 写入 `_active_alarms` 并回填 alarm_id 到算法实例。 | +| `_batch_process_rois` | 676-751 | 从队列取出 ROI 任务, 按 max_batch_size=8 分块, 调用 TensorRT 推理, 后处理后逐个调用 `_handle_detections`。 | +| `_process_frame` | 613-651 | 获取 ROI 配置(含绑定), 预处理裁剪, 组装 roi_items 推入队列。 | +| `_scheduler_worker` | 653-674 | 中心调度线程, 轮询所有视频流取最新帧, 丢弃超龄帧(>0.5s), 调用 `_process_frame`。 | +| `_inference_worker` | 956-976 | 推理线程, 攒批窗口50ms, 调用 `_batch_process_rois`。 | + +**`_camera_alert_cooldown` 读写位置:** +- 写入: 第900行 (`self._camera_alert_cooldown[dedup_key] = now`) +- 读取: 第890行 (`self._camera_alert_cooldown.get(dedup_key)`) +- 无其他模块引用此字典 + +**`_active_alarms` 读写位置:** +- 写入: 第940行 (`self._active_alarms[active_key] = alarm_info.alarm_id`) +- 删除: 第865-868行 (resolve 事件清除) +- 读取: 第880行 (`active_key in self._active_alarms`) +- 无其他模块引用此字典 + +### 1.2 config/config_sync.py - ConfigSyncManager + +| 方法 | 行为描述 | +|------|----------| +| `_init_cloud_redis` (219-241) | 创建云端Redis连接, 参数: socket_connect_timeout=10, socket_timeout=10, retry_on_timeout=True, socket_keepalive=True + TCP keepalive选项, health_check_interval=15。连接失败时将 `_cloud_redis` 设为 None 但不抛异常。 | +| `_listen_config_stream` (326-390) | Stream 监听主循环。外层while循环: 若 `_cloud_redis` 为None则调用 `_init_cloud_redis()` 重连。内层while循环: XREAD BLOCK 5000ms, 无消息时 PING 保活。`redis.ConnectionError` except 块: 将 `_cloud_redis = None` (第381行)。通用 `Exception` except 块: 不置 None, 仅等待重试。 | +| `get_roi_configs_with_bindings` (760-806) | 当传入 camera_id 时走优化路径 `get_bindings_by_camera(camera_id)` (单SQL JOIN查询)。当无 camera_id 时存在 N+1 问题: 先 `get_all_roi_configs()` 再逐个 `get_bindings_by_roi(roi_id)`。 | + +**`self._cloud_redis = None` 出现位置:** +- 第381行: `_listen_config_stream` 中 `redis.ConnectionError` except 块 +- 第241行: `_init_cloud_redis` 中初始化失败 + +### 1.3 core/screenshot_handler.py - ScreenshotHandler + +| 方法 | 行为描述 | +|------|----------| +| `_listen_loop` (159-220) | XREADGROUP 消费截图请求。finally 块中(第191-196行) ACK 消息, ACK 失败时静默 pass -- 意味着消息可能被重复消费。无消息时 PING 保活。ConnectionError 时重连(带指数退避)。当 `_cloud_redis is None` 时也会主动重连。 | +| `_handle_request` (224-280) | 流程: 校验必填字段 -> 设备隔离检查 -> 抓帧 -> 上传COS(失败重试1次) -> HTTP回调(失败降级写Redis)。 | +| `_capture_frame` (284-300) | 优先从 MultiStreamManager 获取已有流帧, 无流时降级临时 RTSP 连接抓帧。 | +| `_send_result` (375-402) | 优先 HTTP 回调, 失败降级写 Redis key `snap:result:{request_id}` (TTL 60s)。 | + +**补偿机制:** 除 COS 上传有1次重试、HTTP 回调有 Redis 降级外, 无其他补偿。xack 失败意味着 Redis 会再次投递该消息(at-least-once 语义)。 + +### 1.4 core/tensorrt_engine.py - TensorRTEngine + +| 方法 | 行为描述 | +|------|----------| +| `load_engine` (109-142) | 在 `_lock` 保护下: (1) 若已有 context 则先释放; (2) **创建新的 CUDA context** `cuda.Device(device_id).make_context()`; (3) 创建 CUDA Stream; (4) 反序列化 engine; (5) 创建 execution context; (6) 分配 buffers。 | +| `infer` (184-253) | 在 `_lock` 保护下: (1) `_cuda_context.push()`; (2) 设置动态 input shape; (3) H2D async memcpy; (4) `execute_async_v2`; (5) D2H async memcpy; (6) `stream.synchronize()`; (7) finally 块中 `_cuda_context.pop()`。 | +| `release` (317-325) | 在 `_lock` 保护下, 幂等释放: `_cuda_context.pop()` + `_cuda_context.detach()`。 | +| `_release_resources` (294-315) | 内部释放: pop/detach CUDA context, synchronize stream, 清空 bindings。 | + +**CUDA context 模式:** 每个 TensorRTEngine 实例创建独立的 CUDA context。`pycuda.autoinit` 在 import 时创建一个默认 context, 而 `load_engine` 再创建一个新的。`infer` 使用 push/pop 模式切换到自己的 context。 + +**`_lock` 使用范围:** 覆盖 `load_engine`, `infer`, `release` 三个公开方法, 保证单引擎实例的线程安全。 + +**EngineManager:** 持有 `Dict[str, TensorRTEngine]`, 有自己的 `_lock`。当前代码只创建一个 "default" 引擎, 不存在多引擎共享 CUDA context 的问题。 + +### 1.5 core/result_reporter.py - ResultReporter + +| 方法 | 行为描述 | +|------|----------| +| `report_alarm` (119-165) | 接收 AlarmInfo + numpy screenshot。将截图 cv2.imencode JPEG (quality=85) 后 base64 编码写入 `alarm_info.snapshot_b64`。然后 JSON 序列化后 LPUSH 到 `local:alarm:pending`。 | +| `report_alarm_resolve` (167-180) | 将 resolve 数据附加 `_type: "resolve"` 标记后 LPUSH 到同一队列 `local:alarm:pending`。 | + +**AlarmInfo 字段:** alarm_id, alarm_type, device_id, scene_id, event_time(ISO8601), alarm_level(0-3), snapshot_b64(Optional[str]), algorithm_code, confidence_score, ext_data(Dict)。 + +### 1.6 core/alarm_upload_worker.py - AlarmUploadWorker + +| 方法 | 行为描述 | +|------|----------| +| `_worker_loop` (141-166) | 主循环: 先处理重试队列, 再 BRPOP `local:alarm:pending` (timeout=2s)。 | +| `_process_alarm` (169-230) | 流程: JSON解析 -> 检查 `_type=="resolve"` 分流 -> base64.b64decode 截图 -> 上传 COS -> HTTP POST 告警元数据。COS 上传失败或 HTTP 失败都进入 `_handle_retry`。 | +| `_handle_retry` (386-425) | 递增 `_retry_count`, 超过 `retry_max` 则 LPUSH 到 `local:alarm:dead`。未超限则计算指数退避延迟, 附加 `_retry_at` 时间戳后 LPUSH 到 `local:alarm:retry`。 | +| `_process_retry_queue` (427-465) | RPOP 逐条检查 `local:alarm:retry`, 到期的放回 pending, 未到期的放回 retry 头部。 | +| `_upload_snapshot_to_cos` (263-316) | base64 解码后直接 `put_object` 上传, object_key 格式: `alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg`。 | + +### 1.7 core/postprocessor.py - PostProcessor + +| 方法 | 行为描述 | +|------|----------| +| `_process_gpu` (73-96) | `torch.from_numpy(boxes).cuda()` 转为 GPU 张量, 调用 `torchvision.ops.nms`, 结果 `.cpu().numpy()` 回到 CPU。**未使用 `torch.no_grad()`**。 | +| `batch_process_detections` (705-812) | 解析 TensorRT 输出, 按 batch 拆分, 逐个做 YOLO 输出解析 + NMS。每次调用创建新的 NMSProcessor 实例。 | + +**GPU 张量生命周期:** `_process_gpu` 中 `boxes_t` 和 `scores_t` 是临时变量, 函数返回后即可被 GC 回收。`keep` 张量在 `.cpu().numpy()` 后也成为临时变量。无显式释放, 依赖 Python GC + PyTorch 缓存分配器。 + +--- + +## 2. 接口契约 + +### 2.1 方法签名与返回值 + +```python +# main.py +def _handle_detections( + self, camera_id: str, roi, bind, frame: VideoFrame, + boxes: Any, scores: Any, class_ids: Any, scale_info: tuple +) -> None + +# config_sync.py +def get_roi_configs_with_bindings( + self, camera_id: Optional[str] = None, force_refresh: bool = False +) -> List[ROIInfoNew] + +def _init_cloud_redis(self) -> None # 失败时 self._cloud_redis = None + +# screenshot_handler.py +def _handle_request(self, fields: dict) -> None +# fields 预期字段: request_id, camera_code, cos_path, callback_url, device_id(可选), rtsp_url(可选) + +# tensorrt_engine.py +def infer(self, input_batch: np.ndarray) -> Tuple[List[np.ndarray], float] +# input_batch: shape=[batch, 3, 480, 480], dtype=float16 +# returns: (outputs_list, inference_time_ms) + +def load_engine(self, engine_path: Optional[str] = None) -> bool + +# result_reporter.py +def report_alarm(self, alarm_info: AlarmInfo, screenshot: Optional[np.ndarray] = None) -> bool +def report_alarm_resolve(self, resolve_data: dict) -> bool + +# alarm_upload_worker.py +def _process_alarm(self, alarm_json: str) -> None +def _handle_retry(self, alarm_json: str, error: str) -> None +def _upload_snapshot_to_cos(self, image_bytes: bytes, alarm_id: str, device_id: str) -> Optional[str] + +# postprocessor.py +def batch_process_detections( + self, batch_outputs: List[np.ndarray], batch_size: int, + conf_threshold: Optional[float] = None, nms_threshold: Optional[float] = None, + per_item_conf_thresholds: Optional[List[float]] = None +) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray]] +``` + +### 2.2 Redis Key 名与格式 + +| Key | Redis实例 | 类型 | 格式 | 模块 | +|-----|-----------|------|------|------| +| `local:alarm:pending` | 本地 | List | JSON(AlarmInfo.to_dict() 或 resolve_data) | result_reporter / alarm_upload_worker | +| `local:alarm:retry` | 本地 | List | JSON(带 _retry_count, _retry_at 字段) | alarm_upload_worker | +| `local:alarm:dead` | 本地 | List | JSON(带 _dead_reason, _dead_at 字段) | alarm_upload_worker | +| `device:{device_id}:config` | 云端 | String | JSON(完整配置) | config_sync | +| `device:{device_id}:version` | 云端 | String | int 版本号 | config_sync | +| `device_config_stream` | 云端 | Stream | {device_id, version, action} | config_sync | +| `local:device:config:current` | 本地 | String | JSON(完整配置) | config_sync | +| `local:device:config:backup` | 本地 | String | JSON(上一版本配置) | config_sync | +| `local:device:config:version` | 本地 | String | int 版本号 | config_sync | +| `local:device:config:stream_last_id` | 本地 | String | Stream message ID | config_sync | +| `edge_snap_request` | 云端 | Stream | {request_id, camera_code, cos_path, callback_url, ...} | screenshot_handler | +| `snap:result:{request_id}` | 云端 | String(TTL=60s) | JSON(降级结果) | screenshot_handler | + +### 2.3 AlarmInfo 数据结构 (完整字段) + +```python +@dataclass +class AlarmInfo: + alarm_id: str # 格式: edge_{device_id}_{YYYYMMDDHHmmss}_{6hex} + alarm_type: str # 算法返回的 alert_type + device_id: str # 实际传入的是 camera_id (非 edge device_id) + scene_id: str # ROI ID + event_time: str # ISO8601 (frame.timestamp) + alarm_level: int # 0=紧急 1=重要 2=普通 3=轻微 + snapshot_b64: Optional[str] # JPEG base64, 由 report_alarm 填充 + algorithm_code: Optional[str] + confidence_score: Optional[float] + ext_data: Optional[Dict] # 包含: duration_ms, roi_id, bbox, target_class, + # bind_id, message, edge_node_id, first_frame_time, + # vehicle_count, area_id +``` + +--- + +## 3. 依赖关系图 + +``` + +-----------------+ + | main.py | + | EdgeInference | + | Service | + +--------+--------+ + | + +------------------+------------------+ + | | | | | + v v v v v + config_sync stream engine postprocess algorithm + (ConfigSync manager manager (PostProc) manager + Manager) | + | | | + v v v + database.py tensorrt_engine.py algorithms/ + (SQLiteManager) (TensorRTEngine) + + +------------------+ + | result_reporter | + | (ResultReporter) | + +--------+---------+ + | + | LPUSH local:alarm:pending + v + +------------------+ + | alarm_upload | + | _worker | + +--------+---------+ + | + +--------+---------+ + | COS Upload | + | HTTP POST cloud | + +------------------+ + + screenshot_handler (独立Redis连接) --> 云端 Redis Stream +``` + +**数据流转:** +1. `_scheduler_worker` 轮询视频流 -> `_process_frame` 获取ROI+预处理 -> 推入 `_batch_roi_queue` +2. `_inference_worker` 消费队列 -> `_batch_process_rois` 做 TensorRT 推理 + 后处理 -> `_handle_detections` +3. `_handle_detections` -> 算法管理器判定 -> 去重过滤 -> `ResultReporter.report_alarm()` LPUSH Redis +4. `AlarmUploadWorker` BRPOP Redis -> base64 decode -> COS upload -> HTTP POST 云端 + +--- + +## 4. 安全边界 + +### 4.1 绝对不能动的代码 (Critical Path) + +| 文件 | 代码区域 | 原因 | +|------|----------|------| +| `tensorrt_engine.py` | `infer()` 的 push/pop/synchronize 顺序 | CUDA context 操作顺序错误会导致段错误或GPU挂死 | +| `tensorrt_engine.py` | `_allocate_buffers()` 的 buffer 分配逻辑 | 改变 buffer 大小会导致推理崩溃 | +| `result_reporter.py` | `AlarmInfo.to_dict()` 的字段名 | alarm_upload_worker 和云端 API 依赖这些字段名 | +| `alarm_upload_worker.py` | Redis key 常量引用 | 必须与 result_reporter 一致 | +| `postprocessor.py` | `_parse_yolo_output` 的输出格式解析 (84行 = 4+80) | 与模型输出格式强耦合 | + +### 4.2 可以安全修改的代码 + +| 文件 | 代码区域 | 注意事项 | +|------|----------|----------| +| `main.py` | `_handle_detections` 的去重逻辑 | 只影响告警频率, 不影响推理管线。但要确保线程安全(当前单线程调用, 无锁)。 | +| `main.py` | `_camera_cooldown_seconds` 默认值 | 可调整, 纯业务参数 | +| `config_sync.py` | `_listen_config_stream` 的重连逻辑 | 注意 `_cloud_redis = None` 的时机 | +| `config_sync.py` | `get_roi_configs_with_bindings` 的 N+1 查询 | 可优化为 JOIN 查询, 但要保持返回值格式不变 | +| `screenshot_handler.py` | `_listen_loop` 的 xack 逻辑 | 可增加重试, 但要注意不能阻塞主循环 | +| `postprocessor.py` | `_process_gpu` 添加 `torch.no_grad()` | 纯优化, 不影响功能 | + +### 4.3 修改时需要同步更新的代码对 + +| 修改点 | 需要同步的位置 | +|--------|----------------| +| `AlarmInfo` 字段变更 | `to_dict()`, `alarm_upload_worker._process_alarm`, 云端API | +| Redis key 名变更 | `result_reporter.py` 常量 + `alarm_upload_worker.py` import | +| `_active_alarms` key 格式变更 | 第865行 resolve 清除逻辑 + 第880行查重逻辑 + 第940行写入 | +| `_camera_alert_cooldown` key 格式变更 | 第888行构建 + 第890行读取 + 第900行写入 | + +--- + +## 5. 已有测试 + +| 测试文件 | 覆盖模块 | 状态 | +|----------|----------|------| +| `tests/test_config_sync.py` | config_sync.py | 存在 | +| `tests/test_postprocessor.py` | postprocessor.py | 存在 | +| `tests/test_result_reporter.py` | result_reporter.py | 存在 | +| `tests/test_tensorrt.py` | tensorrt_engine.py | 存在 | +| `tests/test_preprocessor.py` | preprocessor.py | 存在 | +| `tests/test_video_stream.py` | video_stream.py | 存在 | +| `tests/test_utils.py` | utils | 存在 | +| `test_leave_post_full_workflow.py` | 离岗检测集成 | 存在(项目根目录) | +| `test_vehicle_algorithms.py` | 车辆算法 | 存在(项目根目录) | + +**缺失测试:** +- `main.py` (`_handle_detections`, 去重逻辑) -- **无单元测试** +- `alarm_upload_worker.py` -- **无单元测试** +- `screenshot_handler.py` -- **无单元测试** + +--- + +## 6. 潜在风险 + +### 6.1 Critical (必须关注) + +**[C1] `_handle_detections` 无线程安全保护** +- `_camera_alert_cooldown` 和 `_active_alarms` 两个字典在 `_handle_detections` 中读写, 该方法被 `_inference_worker` 线程调用。当前架构下只有一个推理线程, 所以实际上是单线程安全的。但如果未来增加多推理线程, 将产生竞态条件。 +- 风险等级: 当前低, 架构变更时高 + +**[C2] `_active_alarms` resolve 清除使用遍历+break** +- 第865-868行: `for k, v in list(self._active_alarms.items())` 遍历查找匹配的 alarm_id 后 break。如果同一个 alarm_id 被错误地写入多个 key, 只会清除第一个。 +- 风险等级: 低(alarm_id 是 UUID 级唯一) + +**[C3] CUDA context 与 pycuda.autoinit 共存** +- `import pycuda.autoinit` 在模块加载时创建一个默认 CUDA context。`load_engine` 又创建新的 context。两个 context 共存, 依赖 push/pop 正确切换。如果任何代码在 push/pop 之外使用了 CUDA 操作(如 PostProcessor 的 GPU NMS), 将使用 autoinit 的 context, 与 TensorRT 的 context 不同。 +- 风险等级: 中(当前 NMS 使用 PyTorch CUDA, 与 PyCUDA context 独立) + +### 6.2 Important (应当修复) + +**[I1] `config_sync._listen_config_stream` 通用 Exception 不置 None** +- 第385-390行: 通用 Exception 分支不将 `_cloud_redis` 设为 None, 但 ConnectionError 分支会。如果出现非 ConnectionError 的 Redis 异常(如 ResponseError), 会一直使用同一个可能已损坏的连接重试, 而不是重建连接。 +- 建议: 在通用 Exception 中也加入 `self._cloud_redis = None` 触发重连, 或至少尝试 ping 验证连接健康。 + +**[I2] `get_roi_configs_with_bindings` 的 N+1 查询** +- 当 `camera_id` 为 None 时, 先查所有 ROI, 再逐个查 bindings。ROI 数量多时性能差。 +- 注意: 当 `camera_id` 非空时已经使用了 `get_bindings_by_camera` 优化查询, 这是正确的。 +- 建议: 添加一个 `get_all_bindings()` 方法或使用 JOIN 查询, 一次取出所有 bindings。 + +**[I3] `screenshot_handler._listen_loop` 中 xack 失败静默 pass** +- 第195行: xack 失败时 pass, 不记录日志。导致无法发现 ACK 累积问题, PEL (Pending Entries List) 会持续增长。 +- 建议: 至少记录 warning 日志。 + +**[I4] `_process_gpu` 未使用 `torch.no_grad()`** +- NMS 是纯推理操作, 不需要梯度计算。未包裹 `torch.no_grad()` 会导致不必要的计算图记录和额外内存占用。 +- 建议: 用 `with torch.no_grad():` 包裹 GPU NMS 调用。 + +**[I5] `report_alarm` 中截图 base64 编码在调用线程中执行** +- 第133-143行: JPEG 编码 + base64 编码在推理线程中同步执行。一张 1080p 截图约 100-300KB JPEG, base64 后约 130-400KB。对高频告警场景可能阻塞推理线程。 +- 当前影响: 有 `_camera_cooldown_seconds=30` 限频, 实际影响有限。 + +### 6.3 Suggestions (改进建议) + +**[S1] `_handle_detections` 中 import 在函数内** +- 第914行: `from core.result_reporter import AlarmInfo, generate_alarm_id` 在热路径函数内 import。虽然 Python 会缓存模块, 但每次调用仍有字典查找开销。 +- 建议: 移到文件顶部 import。 + +**[S2] `batch_process_detections` 每次创建新 NMSProcessor** +- 第753行: 每次调用都 `NMSProcessor(nms_threshold, use_gpu=True)`, 而 PostProcessor.__init__ 已经创建了 `self._nms`。 +- 建议: 复用 `self._nms` 或在需要不同阈值时参数化调用。 + +**[S3] `alarm_upload_worker._process_retry_queue` 的 RPOP+LPUSH 非原子** +- 重试队列的检查逻辑: RPOP -> 检查时间 -> LPUSH 回去。在高并发下可能有短暂数据丢失窗口(RPOP 后进程崩溃)。 +- 当前影响: 单线程消费, 实际风险极低。 + +**[S4] `_camera_alert_cooldown` 字典无清理机制** +- 随着时间推移, 已下线的摄像头 + 告警类型的 key 会一直驻留内存。 +- 当前影响: 每个 key 约 100 字节, 不太可能成为问题。长期运行建议定期清理。 + +**[S5] AlarmInfo.device_id 语义歧义** +- `_handle_detections` 第918行传入 `device_id=camera_id`, 但 AlarmInfo 字段名为 `device_id`。云端可能误认为这是边缘设备ID而非摄像头ID。实际的边缘设备ID在 `ext_data.edge_node_id` 中。 +- 建议: 确认云端 API 对此字段的预期, 考虑重命名或补充文档。 + +--- + +## 7. 修复点风险矩阵 + +| 修复目标 | 涉及文件 | 变更范围 | 回归风险 | 测试覆盖 | +|----------|----------|----------|----------|----------| +| 告警去重逻辑优化 | main.py | _handle_detections 方法内部 | 低(不影响推理管线) | **无**(需新增) | +| Redis 重连机制增强 | config_sync.py | _listen_config_stream except块 | 中(影响配置同步) | 有(test_config_sync) | +| N+1 查询优化 | config_sync.py + database.py | get_roi_configs_with_bindings | 中(影响配置读取) | 有(test_config_sync) | +| xack 失败处理 | screenshot_handler.py | _listen_loop finally块 | 低(仅日志增强) | **无**(需新增) | +| GPU NMS 优化 | postprocessor.py | _process_gpu | 低(纯优化) | 有(test_postprocessor) | +| CUDA context 安全 | tensorrt_engine.py | load_engine/infer | 高(GPU操作) | 有(test_tensorrt) | +| 截图编码优化 | result_reporter.py | report_alarm | 中(涉及序列化格式) | 有(test_result_reporter) | + +--- + +## 8. 审查总结 + +**代码整体质量:** 项目架构清晰, 模块职责分明, 关键路径有合理的错误处理和日志输出。Redis 双层架构(云端分发+本地自治)设计合理, 支持离线运行。 + +**主要关注点:** +1. 告警去重字典 `_camera_alert_cooldown` 和 `_active_alarms` 是核心业务逻辑, 修改时务必保持 key 格式和读写位置的一致性。当前无测试覆盖, 是最大风险点。 +2. `config_sync._listen_config_stream` 的通用 Exception 处理策略需要与 ConnectionError 保持一致。 +3. TensorRT 的 CUDA context push/pop 模式是正确的, 但与 `pycuda.autoinit` 共存需要注意不要在 push/pop 之外的代码中使用 PyCUDA 操作。 +4. 截图通过 base64 经 Redis 传递的设计合理(避免文件IO), 但要注意大图场景下的内存和队列压力。 diff --git a/main.py b/main.py index 976d36d..dfc0a05 100644 --- a/main.py +++ b/main.py @@ -92,6 +92,10 @@ class EdgeInferenceService: # ROI级别告警去重:同ROI+同类型未resolve的告警不重复发送 # key: f"{roi_id}_{alert_type}", value: alarm_id self._active_alarms: Dict[str, str] = {} + self._active_alarms_time: Dict[str, datetime] = {} # 活跃告警创建时间 + self._cleanup_counter = 0 + self._cleanup_interval = 100 # 每 100 次 _handle_detections 清理一次 + self._active_alarm_max_age_sec = 3600 # 活跃告警最大存活时间(1小时) self._logger.info("Edge_Inference_Service 初始化开始") @@ -800,6 +804,12 @@ class EdgeInferenceService: ): """处理检测结果 - 算法接管判断权""" try: + # 惰性清理过期去重记录 + self._cleanup_counter += 1 + if self._cleanup_counter >= self._cleanup_interval: + self._cleanup_counter = 0 + self._cleanup_dedup_dicts(frame.timestamp) + if self._algorithm_manager is None: self._logger.warning("算法管理器不可用,跳过算法处理") return @@ -865,6 +875,7 @@ class EdgeInferenceService: for k, v in list(self._active_alarms.items()): if v == resolve_alarm_id: del self._active_alarms[k] + self._active_alarms_time.pop(k, None) self._logger.debug(f"[去重] 活跃告警已清除: {k} -> {resolve_alarm_id}") break @@ -938,6 +949,7 @@ class EdgeInferenceService: # 记录活跃告警(用于 ROI 级去重) self._active_alarms[active_key] = alarm_info.alarm_id + self._active_alarms_time[active_key] = frame.timestamp # 回填 alarm_id 到算法实例(用于后续 resolve 追踪,泛化支持所有算法类型) algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type) @@ -952,7 +964,32 @@ class EdgeInferenceService: except Exception as e: self._logger.error(f"处理检测结果失败: {e}") - + + def _cleanup_dedup_dicts(self, now: datetime): + """惰性清理过期的去重记录""" + # 清理 _camera_alert_cooldown 中已过冷却期的记录 + expired_cooldown = [ + k for k, v in self._camera_alert_cooldown.items() + if (now - v).total_seconds() > self._camera_cooldown_seconds * 2 + ] + for k in expired_cooldown: + del self._camera_alert_cooldown[k] + + # 清理 _active_alarms 中可能因 resolve 丢失而残留的记录 + expired_active = [ + k for k, t in self._active_alarms_time.items() + if (now - t).total_seconds() > self._active_alarm_max_age_sec + ] + for k in expired_active: + self._active_alarms.pop(k, None) + self._active_alarms_time.pop(k, None) + self._logger.warning(f"[去重] 活跃告警超时清除: {k}") + + if expired_cooldown or expired_active: + self._logger.debug( + f"[去重] 清理完成: cooldown={len(expired_cooldown)}, active={len(expired_active)}" + ) + def _inference_worker(self): """推理线程:攒批窗口内收集 ROI 请求,批量推理""" while not self._stop_event.is_set():