diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b763b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +CHANGELOG.md diff --git a/edge_inference_service/__init__.py b/__init__.py similarity index 100% rename from edge_inference_service/__init__.py rename to __init__.py diff --git a/edge_inference_service/build_engine.py b/build_engine.py similarity index 100% rename from edge_inference_service/build_engine.py rename to build_engine.py diff --git a/edge_inference_service/config/__init__.py b/config/__init__.py similarity index 100% rename from edge_inference_service/config/__init__.py rename to config/__init__.py diff --git a/edge_inference_service/config/__pycache__/__init__.cpython-310.pyc b/config/__pycache__/__init__.cpython-310.pyc similarity index 100% rename from edge_inference_service/config/__pycache__/__init__.cpython-310.pyc rename to config/__pycache__/__init__.cpython-310.pyc diff --git a/edge_inference_service/config/__pycache__/config_models.cpython-310.pyc b/config/__pycache__/config_models.cpython-310.pyc similarity index 100% rename from edge_inference_service/config/__pycache__/config_models.cpython-310.pyc rename to config/__pycache__/config_models.cpython-310.pyc diff --git a/edge_inference_service/config/__pycache__/database.cpython-310.pyc b/config/__pycache__/database.cpython-310.pyc similarity index 100% rename from edge_inference_service/config/__pycache__/database.cpython-310.pyc rename to config/__pycache__/database.cpython-310.pyc diff --git a/edge_inference_service/config/__pycache__/settings.cpython-310.pyc b/config/__pycache__/settings.cpython-310.pyc similarity index 100% rename from edge_inference_service/config/__pycache__/settings.cpython-310.pyc rename to config/__pycache__/settings.cpython-310.pyc diff --git a/edge_inference_service/config/config_models.py b/config/config_models.py similarity index 100% rename from edge_inference_service/config/config_models.py rename to config/config_models.py diff --git a/edge_inference_service/config/database.py b/config/database.py similarity index 100% rename from edge_inference_service/config/database.py rename to config/database.py diff --git a/edge_inference_service/config/settings.py b/config/settings.py similarity index 100% rename from edge_inference_service/config/settings.py rename to config/settings.py diff --git a/edge_inference_service/core/__init__.py b/core/__init__.py similarity index 100% rename from edge_inference_service/core/__init__.py rename to core/__init__.py diff --git a/edge_inference_service/core/__pycache__/__init__.cpython-310.pyc b/core/__pycache__/__init__.cpython-310.pyc similarity index 100% rename from edge_inference_service/core/__pycache__/__init__.cpython-310.pyc rename to core/__pycache__/__init__.cpython-310.pyc diff --git a/edge_inference_service/core/__pycache__/config_sync.cpython-310.pyc b/core/__pycache__/config_sync.cpython-310.pyc similarity index 100% rename from edge_inference_service/core/__pycache__/config_sync.cpython-310.pyc rename to core/__pycache__/config_sync.cpython-310.pyc diff --git a/edge_inference_service/core/__pycache__/postprocessor.cpython-310.pyc b/core/__pycache__/postprocessor.cpython-310.pyc similarity index 100% rename from edge_inference_service/core/__pycache__/postprocessor.cpython-310.pyc rename to core/__pycache__/postprocessor.cpython-310.pyc diff --git a/edge_inference_service/core/__pycache__/preprocessor.cpython-310.pyc b/core/__pycache__/preprocessor.cpython-310.pyc similarity index 100% rename from edge_inference_service/core/__pycache__/preprocessor.cpython-310.pyc rename to core/__pycache__/preprocessor.cpython-310.pyc diff --git a/edge_inference_service/core/__pycache__/result_reporter.cpython-310.pyc b/core/__pycache__/result_reporter.cpython-310.pyc similarity index 100% rename from edge_inference_service/core/__pycache__/result_reporter.cpython-310.pyc rename to core/__pycache__/result_reporter.cpython-310.pyc diff --git a/edge_inference_service/core/__pycache__/tensorrt_engine.cpython-310.pyc b/core/__pycache__/tensorrt_engine.cpython-310.pyc similarity index 100% rename from edge_inference_service/core/__pycache__/tensorrt_engine.cpython-310.pyc rename to core/__pycache__/tensorrt_engine.cpython-310.pyc diff --git a/edge_inference_service/core/__pycache__/video_stream.cpython-310.pyc b/core/__pycache__/video_stream.cpython-310.pyc similarity index 100% rename from edge_inference_service/core/__pycache__/video_stream.cpython-310.pyc rename to core/__pycache__/video_stream.cpython-310.pyc diff --git a/edge_inference_service/core/config_sync.py b/core/config_sync.py similarity index 100% rename from edge_inference_service/core/config_sync.py rename to core/config_sync.py diff --git a/edge_inference_service/core/postprocessor.py b/core/postprocessor.py similarity index 100% rename from edge_inference_service/core/postprocessor.py rename to core/postprocessor.py diff --git a/edge_inference_service/core/preprocessor.py b/core/preprocessor.py similarity index 100% rename from edge_inference_service/core/preprocessor.py rename to core/preprocessor.py diff --git a/edge_inference_service/core/result_reporter.py b/core/result_reporter.py similarity index 100% rename from edge_inference_service/core/result_reporter.py rename to core/result_reporter.py diff --git a/edge_inference_service/core/tensorrt_engine.py b/core/tensorrt_engine.py similarity index 100% rename from edge_inference_service/core/tensorrt_engine.py rename to core/tensorrt_engine.py diff --git a/edge_inference_service/core/video_stream.py b/core/video_stream.py similarity index 100% rename from edge_inference_service/core/video_stream.py rename to core/video_stream.py diff --git a/edge_inference_service/CHANGELOG.md b/edge_inference_service/CHANGELOG.md deleted file mode 100644 index e5056c5..0000000 --- a/edge_inference_service/CHANGELOG.md +++ /dev/null @@ -1,205 +0,0 @@ -# CHANGELOG - Edge_Inference_Service - - -### v1.0.0 -**更新时间**: 2026-01-29 18:07:35 -**更新类型**: 启动 -**更新人员**: 系统 -**影响范围**: 全局 - -- Edge_Inference_Service 启动运行 - ---- -## 版本更新记录 - -### v1.0.0 -**更新时间**: 2024-01-29 10:00:00 -**更新类型**: 初始化 -**更新人员**: AI Edge Architecture Team -**影响范围**: 全局 - -#### 功能新增 -- 项目初始化创建,搭建完整的边缘AI推理服务框架 -- 实现6大核心模块的工业级代码 - -#### 模块详情 - -**1. 配置同步模块 (config/)** -- settings.py: 全局配置管理,支持环境变量和 dataclass 配置 -- database.py: MySQL 连接池管理,支持 camera_info、roi_config、alert_records 表 -- config_models.py: 数据模型定义,包含 CameraInfo、ROIInfo、AlertInfo 等 - -**2. 核心处理模块 (core/)** -- config_sync.py: 配置同步管理器,支持 MySQL 读取、Redis Pub/Sub 订阅、配置缓存 -- video_stream.py: RTSP 流读取器,支持多线程、动态抽帧、断线重连(指数退避) -- preprocessor.py: 图像预处理流水线,支持 ROI 裁剪、Letterbox、Batch 打包 -- tensorrt_engine.py: TensorRT 引擎管理,支持异步推理、显存优化、性能监控 -- postprocessor.py: 后处理模块,NMS 算法、坐标映射、告警状态机 -- result_reporter.py: 结果上报模块,MQTT 客户端、告警存储、重试机制 - -**3. 工具类模块 (utils/)** -- logger.py: 分级日志系统,支持性能指标记录 -- common.py: 公共工具函数,包括重试机制、Base64 编解码等 -- version_control.py: 版本控制,记录代码更新历史 - -**4. 测试模块 (tests/)** -- test_config_sync.py: 配置模块单元测试 -- test_video_stream.py: 视频流模块单元测试 -- test_preprocessor.py: 预处理模块单元测试 -- test_postprocessor.py: 后处理模块单元测试 - -#### 技术特性 -- 多线程/多进程架构设计 -- 完善的异常处理机制 -- 显存管理与优化 -- 异步推理模式 -- 配置动态刷新 -- 心跳上报机制 -- 性能监控与日志 - -#### 代码质量 -- 遵循 PEP 8 编码规范 -- 详细中文注释 -- 模块化设计,高内聚低耦合 -- 单元测试框架 - -#### 依赖环境 -- Python 3.8+ -- CUDA 12.1 + cuDNN 8.9.7 + TensorRT 8.6.1 -- opencv-python, ultralytics, pycuda, paho-mqtt, sqlalchemy, redis - ---- - -### v1.0.1 -**更新时间**: 2024-01-29 10:30:00 -**更新类型**: 优化 -**更新人员**: AI Edge Architecture Team -**影响范围**: requirements.txt - -#### 依赖版本优化 -- 更新 requirements.txt,选择最稳定版本而非最新版本 -- 确保环境一致性,减少因版本兼容性问题导致的故障 - -#### 依赖版本清单 - -| 依赖 | 版本 | 选择理由 | -|------|------|---------| -| opencv-python | 4.8.0 | 4.8系列首个稳定版,广泛验证 | -| numpy | 1.24.0 | Python 3.8-3.11完美兼容 | -| paho-mqtt | 1.6.1 | 1.x最终稳定版,企业级 | -| sqlalchemy | 2.0.23 | 2.0系列LTS版 | -| pymysql | 1.1.0 | 成熟稳定版本 | -| redis | 4.6.0 | 4.x最终稳定版 | -| pyyaml | 6.0.1 | 安全稳定版 | -| pytest | 7.4.4 | 7.x最终稳定版 | -| pytest-cov | 4.1.0 | 成熟稳定版 | - -#### 可选依赖(按需安装) -- tensorrt==8.6.1.6 (CUDA 12.1专用) -- pycuda==2023.1.1 (稳定版) -- ultralytics==8.1.0 (YOLO11 LTS版) - ---- - -### v1.0.2 -**更新时间**: 2024-01-29 11:00:00 -**更新类型**: 修复 -**更新人员**: AI Edge Architecture Team -**影响范围**: result_reporter.py - -#### Bug修复 -- 修复 result_reporter.py 中的5处语法错误 -- 问题:将 `self._lock:` 误写为锁语句,正确形式为 `with self._lock:` -- 修复缺失的 numpy 导入:AlertReporter.report_alert() 方法使用了 `np.ndarray` 类型提示但未导入 numpy - ---- - -### v1.0.3 -**更新时间**: 2024-01-29 11:30:00 -**更新类型**: 优化 -**更新人员**: AI Edge Architecture Team -**影响范围**: database.py, main.py, config_sync.py - -#### 功能优化 -- 实现数据库连接可选模式 -- 当MySQL不可用时,服务仍可正常运行(无数据库模式) -- 添加数据库可用性检测,不阻塞服务启动 -- 所有数据库操作方法添加了不可用时的优雅降级处理 - -#### 技术细节 -- DatabaseManager添加 `_available` 标志 -- get_session() 方法支持空会话返回 -- get_camera_info()、get_roi_configs() 等方法返回空列表 -- save_alert_record() 等方法返回 False - ---- - -### v1.0.4 -**更新时间**: 2024-01-29 12:00:00 -**更新类型**: 新增 -**更新人员**: AI Edge Architecture Team -**影响范围**: build_engine.py - -#### 功能新增 -- 新增 TensorRT Engine 生成脚本 build_engine.py -- 支持将 YOLO11 模型转换为 TensorRT Engine -- 支持 FP16 精度、480×480 输入、动态 Batch(1-8) -- 支持 opt_level=4 最大优化 -- 内置模型下载功能 - -#### 使用方法 - -**方式1: 使用本地 PyTorch 模型** -```bash -python build_engine.py --model yolo11n.pt --output models/yolo11n.engine --fp16 --batch 8 -``` - -**方式2: 下载 YOLO11 模型并转换** -```bash -python build_engine.py --download yolo11n --output models/yolo11n.engine --fp16 --batch 8 -``` - -**方式3: 使用已有 ONNX 模型** -```bash -python build_engine.py --model model.onnx --output models/yolo11n.engine --fp16 --batch 8 -``` - -#### 参数说明 - -| 参数 | 说明 | 默认值 | -|------|------|--------| -| --model | 输入模型路径 (.pt/.onnx) | yolo11n.pt | -| --output | 输出 engine 路径 | models/yolo11n.engine | -| --input-size | 输入图像尺寸 | 480 | -| --batch | 最大 batch 大小 | 8 | -| --fp16 | 启用 FP16 精度 | True | -| --opt-level | 优化级别 (1-4) | 4 | -| --download | 下载模型 (yolo11n/s/m/l/x) | None | - ---- - -### v1.0.5 -**更新时间**: 2026-01-29 12:30:00 -**更新类型**: 修改 -**更新人员**: AI Edge Architecture Team -**影响范围**: 全局 - -#### 模型迁移 -- 将默认模型从 YOLOv8 迁移至 YOLO11 -- 默认模型变更为 yolo11n(更轻量、更高效) - -#### 修改内容 -| 文件 | 修改项 | -|------|--------| -| build_engine.py | 默认模型、帮助文本、示例命令 | -| config/settings.py | 默认引擎路径 | -| __init__.py | 使用说明 | -| CHANGELOG.md | 版本记录、参数说明 | - -#### 新默认配置 -- 模型名称: yolo11n -- 引擎路径: ./models/yolo11n.engine -- ultralytics 版本: 8.1.0 - ---- -*Generated by Edge_Inference_Service Version Control System* diff --git a/edge_inference_service/main.py b/edge_inference_service/main.py deleted file mode 100644 index fdc821b..0000000 --- a/edge_inference_service/main.py +++ /dev/null @@ -1,412 +0,0 @@ -""" -主入口模块 -整合所有模块,启动推理服务 -""" - -import logging -import os -import sys -import threading -import signal -import time -from datetime import datetime -from typing import Dict, Any, Optional - -from config.settings import get_settings, Settings -from config.database import init_database -from core.config_sync import get_config_sync_manager, ConfigSyncManager -from core.video_stream import MultiStreamManager, VideoFrame -from core.preprocessor import ImagePreprocessor -from core.tensorrt_engine import TensorRTEngine, EngineManager -from core.postprocessor import PostProcessor -from core.result_reporter import ResultReporter -from utils.logger import get_logger, StructuredLogger -from utils.version_control import get_version_control - -logger = logging.getLogger(__name__) - - -class EdgeInferenceService: - """边缘推理服务主类 - - 整合所有模块,提供完整的推理服务 - """ - - def __init__(self): - self._running = False - self._settings = get_settings() - self._logger = get_logger("main") - self._version_control = get_version_control() - - self._db_manager = None - self._config_manager: Optional[ConfigSyncManager] = None - self._stream_manager: Optional[MultiStreamManager] = None - self._preprocessor: Optional[ImagePreprocessor] = None - self._engine_manager: Optional[EngineManager] = None - self._postprocessor: Optional[PostProcessor] = None - self._reporter: Optional[ResultReporter] = None - - self._processing_threads: Dict[str, threading.Thread] = {} - self._stop_event = threading.Event() - - self._performance_stats = { - "start_time": None, - "total_frames_processed": 0, - "total_alerts_generated": 0, - "uptime_seconds": 0, - } - - self._logger.info("Edge_Inference_Service 初始化开始") - - def _init_database(self): - """初始化数据库""" - try: - from config.database import DatabaseManager - self._db_manager = DatabaseManager() - if self._db_manager.is_available: - self._logger.info("数据库初始化成功") - else: - self._logger.warning("数据库不可见,服务将在无数据库模式下运行") - except Exception as e: - self._logger.warning(f"数据库初始化失败,服务将在无数据库模式下运行: {e}") - self._db_manager = None - - def _init_config_manager(self): - """初始化配置管理器""" - try: - self._config_manager = get_config_sync_manager() - self._config_manager.start_config_subscription() - self._logger.info("配置管理器初始化成功") - except Exception as e: - self._logger.error(f"配置管理器初始化失败: {e}") - raise - - def _init_stream_manager(self): - """初始化流管理器""" - self._stream_manager = MultiStreamManager() - self._logger.info("流管理器初始化成功") - - def _init_preprocessor(self): - """初始化预处理器""" - self._preprocessor = ImagePreprocessor() - self._logger.info("预处理器初始化成功") - - def _init_engine(self): - """初始化推理引擎""" - try: - self._engine_manager = EngineManager() - - engine_path = self._settings.inference.model_path - if os.path.exists(engine_path): - self._engine_manager.load_engine("default", engine_path) - self._logger.info(f"推理引擎加载成功: {engine_path}") - else: - self._logger.warning(f"引擎文件不存在: {engine_path}") - - except Exception as e: - self._logger.error(f"推理引擎初始化失败: {e}") - raise - - def _init_postprocessor(self): - """初始化后处理器""" - self._postprocessor = PostProcessor() - self._logger.info("后处理器初始化成功") - - def _init_reporter(self): - """初始化结果上报器""" - self._reporter = ResultReporter() - self._logger.info("结果上报器初始化成功") - - def initialize(self): - """初始化所有组件""" - self._logger.info("=" * 50) - self._logger.info("Edge_Inference_Service 启动") - self._logger.info("=" * 50) - - self._init_database() - self._init_config_manager() - self._init_stream_manager() - self._init_preprocessor() - self._init_engine() - self._init_postprocessor() - self._init_reporter() - - self._performance_stats["start_time"] = datetime.now() - - self._version_control.record_update( - version="1.0.0", - update_type="启动", - description="Edge_Inference_Service 启动运行", - updated_by="系统", - affected_items=["全局"], - ) - - self._logger.info("所有组件初始化完成") - - def _load_cameras(self): - """加载摄像头配置""" - cameras = self._config_manager.get_cameras() - - for camera in cameras: - try: - self._stream_manager.add_stream( - camera_id=camera.camera_id, - rtsp_url=camera.rtsp_url, - target_fps=self._settings.video_stream.default_fps, - on_frame_callback=self._create_frame_callback(camera.camera_id) - ) - self._logger.info(f"已添加摄像头: {camera.camera_id}") - except Exception as e: - self._logger.error(f"添加摄像头失败 {camera.camera_id}: {e}") - - def _create_frame_callback(self, camera_id: str): - """创建帧处理回调""" - def callback(frame): - self._process_frame(camera_id, frame) - return callback - - def _process_frame(self, camera_id: str, frame: VideoFrame): - """处理视频帧""" - try: - start_time = time.perf_counter() - - roi_configs = self._config_manager.get_roi_configs(camera_id) - - for roi in roi_configs: - self._process_roi_frame(camera_id, frame, roi) - - processing_time_ms = (time.perf_counter() - start_time) * 1000 - - self._performance_stats["total_frames_processed"] += 1 - - self._logger.log_inference_latency( - processing_time_ms, - batch_size=1 - ) - - except Exception as e: - self._logger.error(f"处理帧失败 {camera_id}: {e}") - - def _process_roi_frame( - self, - camera_id: str, - frame: VideoFrame, - roi - ): - """处理ROI帧""" - try: - cropped = self._preprocessor.preprocess_single(frame.image, roi) - - processed_image, scale_info = cropped - - batch_data = self._preprocessor._batch_preprocessor._stack_and_normalize( - [processed_image] - ) - - engine = self._engine_manager.get_engine("default") - if engine is None: - return - - outputs, inference_time_ms = engine.infer(batch_data) - - boxes, scores, class_ids = self._postprocessor.process_detections( - outputs, - conf_threshold=self._settings.inference.conf_threshold - ) - - if len(boxes) > 0: - self._handle_detections( - camera_id, roi.roi_id, frame, - boxes, scores, class_ids, - scale_info - ) - - except Exception as e: - self._logger.error(f"处理ROI帧失败: {e}") - - def _handle_detections( - self, - camera_id: str, - roi_id: str, - frame: VideoFrame, - boxes: any, - scores: any, - class_ids: any, - scale_info: tuple - ): - """处理检测结果""" - try: - from config.config_models import AlgorithmType - - mapped_boxes = self._postprocessor.map_coordinates( - boxes, scale_info, - (frame.width, frame.height) - ) - - for i, box in enumerate(mapped_boxes): - detection_result = { - "class_id": int(class_ids[i]) if len(class_ids) > 0 else 0, - "confidence": float(scores[i]), - "bbox": box, - } - - alert_result = self._postprocessor.check_alarm_condition( - roi_id, True, frame.timestamp - ) - - if alert_result["should_alert"]: - self._performance_stats["total_alerts_generated"] += 1 - - screenshot = frame.image - - self._reporter.report_detection_alert( - camera_id=camera_id, - roi_id=roi_id, - alert_type="detection", - detection={ - "class_name": f"class_{detection_result['class_id']}", - "confidence": detection_result["confidence"], - "bbox": detection_result["bbox"], - "message": f"检测到目标" - }, - screenshot=screenshot - ) - - self._logger.log_alert( - "detection", - camera_id, - roi_id, - detection_result["confidence"] - ) - - except Exception as e: - self._logger.error(f"处理检测结果失败: {e}") - - def start(self): - """启动服务""" - if self._running: - return - - self._running = True - self._stop_event.clear() - - self._load_cameras() - - self._stream_manager.start_all() - - self._logger.info("Edge_Inference_Service 已启动") - - self._start_heartbeat_thread() - - self._register_signal_handlers() - - self._wait_for_shutdown() - - def _start_heartbeat_thread(self): - """启动心跳线程""" - def heartbeat(): - while not self._stop_event.is_set(): - try: - uptime = (datetime.now() - self._performance_stats["start_time"]).total_seconds() - self._performance_stats["uptime_seconds"] = uptime - - status = { - "running": True, - "uptime_seconds": uptime, - "frames_processed": self._performance_stats["total_frames_processed"], - "alerts_generated": self._performance_stats["total_alerts_generated"], - "stream_stats": self._stream_manager.get_statistics() if self._stream_manager else {}, - } - - if self._reporter: - self._reporter.report_heartbeat("edge_inference_device", status) - - except Exception as e: - self._logger.error(f"心跳上报失败: {e}") - - time.sleep(30) - - thread = threading.Thread(target=heartbeat, name="Heartbeat", daemon=True) - thread.start() - - def _register_signal_handlers(self): - """注册信号处理器""" - def handle_signal(signum, frame): - self._logger.info(f"收到信号 {signum}, 正在停止服务...") - self.stop() - - signal.signal(signal.SIGINT, handle_signal) - signal.signal(signal.SIGTERM, handle_signal) - - def _wait_for_shutdown(self): - """等待关闭信号""" - while not self._stop_event.is_set(): - time.sleep(1) - - def stop(self): - """停止服务""" - if not self._running: - return - - self._running = False - self._stop_event.set() - - if self._stream_manager: - self._stream_manager.stop_all() - self._stream_manager.close() - - if self._engine_manager: - self._engine_manager.release_all() - - if self._config_manager: - self._config_manager.stop_config_subscription() - self._config_manager.close() - - if self._reporter: - self._reporter.close() - - self._performance_stats["uptime_seconds"] = ( - (datetime.now() - self._performance_stats["start_time"]).total_seconds() - ) - - self._logger.info("Edge_Inference_Service 已停止") - self._logger.info(f"运行统计: {self._performance_stats}") - - def get_status(self) -> Dict[str, Any]: - """获取服务状态""" - return { - "running": self._running, - "start_time": ( - self._performance_stats["start_time"].isoformat() - if self._performance_stats["start_time"] else None - ), - "uptime_seconds": self._performance_stats["uptime_seconds"], - "total_frames_processed": self._performance_stats["total_frames_processed"], - "total_alerts_generated": self._performance_stats["total_alerts_generated"], - "stream_manager": ( - self._stream_manager.get_statistics() - if self._stream_manager else {} - ), - "config_version": ( - self._config_manager.config_version - if self._config_manager else None - ), - } - - -def main(): - """主函数入口""" - service = EdgeInferenceService() - - try: - service.initialize() - service.start() - except KeyboardInterrupt: - service.stop() - except Exception as e: - logger.error(f"服务异常: {e}") - raise - - -if __name__ == "__main__": - main() diff --git a/edge_inference_service/yolo11n.pt b/edge_inference_service/yolo11n.pt deleted file mode 100644 index 45b273b..0000000 Binary files a/edge_inference_service/yolo11n.pt and /dev/null differ diff --git a/edge_inference_service/logs/main.log b/logs/main.log similarity index 100% rename from edge_inference_service/logs/main.log rename to logs/main.log diff --git a/edge_inference_service/logs/main_error.log b/logs/main_error.log similarity index 100% rename from edge_inference_service/logs/main_error.log rename to logs/main_error.log diff --git a/main.py b/main.py index eb389a0..fdc821b 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,412 @@ -# 这是一个示例 Python 脚本。 +""" +主入口模块 +整合所有模块,启动推理服务 +""" -# 按 Shift+F10 执行或将其替换为您的代码。 -# 按 双击 Shift 在所有地方搜索类、文件、工具窗口、操作和设置。 +import logging +import os +import sys +import threading +import signal +import time +from datetime import datetime +from typing import Dict, Any, Optional + +from config.settings import get_settings, Settings +from config.database import init_database +from core.config_sync import get_config_sync_manager, ConfigSyncManager +from core.video_stream import MultiStreamManager, VideoFrame +from core.preprocessor import ImagePreprocessor +from core.tensorrt_engine import TensorRTEngine, EngineManager +from core.postprocessor import PostProcessor +from core.result_reporter import ResultReporter +from utils.logger import get_logger, StructuredLogger +from utils.version_control import get_version_control + +logger = logging.getLogger(__name__) -def print_hi(name): - # 在下面的代码行中使用断点来调试脚本。 - print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 +class EdgeInferenceService: + """边缘推理服务主类 + + 整合所有模块,提供完整的推理服务 + """ + + def __init__(self): + self._running = False + self._settings = get_settings() + self._logger = get_logger("main") + self._version_control = get_version_control() + + self._db_manager = None + self._config_manager: Optional[ConfigSyncManager] = None + self._stream_manager: Optional[MultiStreamManager] = None + self._preprocessor: Optional[ImagePreprocessor] = None + self._engine_manager: Optional[EngineManager] = None + self._postprocessor: Optional[PostProcessor] = None + self._reporter: Optional[ResultReporter] = None + + self._processing_threads: Dict[str, threading.Thread] = {} + self._stop_event = threading.Event() + + self._performance_stats = { + "start_time": None, + "total_frames_processed": 0, + "total_alerts_generated": 0, + "uptime_seconds": 0, + } + + self._logger.info("Edge_Inference_Service 初始化开始") + + def _init_database(self): + """初始化数据库""" + try: + from config.database import DatabaseManager + self._db_manager = DatabaseManager() + if self._db_manager.is_available: + self._logger.info("数据库初始化成功") + else: + self._logger.warning("数据库不可见,服务将在无数据库模式下运行") + except Exception as e: + self._logger.warning(f"数据库初始化失败,服务将在无数据库模式下运行: {e}") + self._db_manager = None + + def _init_config_manager(self): + """初始化配置管理器""" + try: + self._config_manager = get_config_sync_manager() + self._config_manager.start_config_subscription() + self._logger.info("配置管理器初始化成功") + except Exception as e: + self._logger.error(f"配置管理器初始化失败: {e}") + raise + + def _init_stream_manager(self): + """初始化流管理器""" + self._stream_manager = MultiStreamManager() + self._logger.info("流管理器初始化成功") + + def _init_preprocessor(self): + """初始化预处理器""" + self._preprocessor = ImagePreprocessor() + self._logger.info("预处理器初始化成功") + + def _init_engine(self): + """初始化推理引擎""" + try: + self._engine_manager = EngineManager() + + engine_path = self._settings.inference.model_path + if os.path.exists(engine_path): + self._engine_manager.load_engine("default", engine_path) + self._logger.info(f"推理引擎加载成功: {engine_path}") + else: + self._logger.warning(f"引擎文件不存在: {engine_path}") + + except Exception as e: + self._logger.error(f"推理引擎初始化失败: {e}") + raise + + def _init_postprocessor(self): + """初始化后处理器""" + self._postprocessor = PostProcessor() + self._logger.info("后处理器初始化成功") + + def _init_reporter(self): + """初始化结果上报器""" + self._reporter = ResultReporter() + self._logger.info("结果上报器初始化成功") + + def initialize(self): + """初始化所有组件""" + self._logger.info("=" * 50) + self._logger.info("Edge_Inference_Service 启动") + self._logger.info("=" * 50) + + self._init_database() + self._init_config_manager() + self._init_stream_manager() + self._init_preprocessor() + self._init_engine() + self._init_postprocessor() + self._init_reporter() + + self._performance_stats["start_time"] = datetime.now() + + self._version_control.record_update( + version="1.0.0", + update_type="启动", + description="Edge_Inference_Service 启动运行", + updated_by="系统", + affected_items=["全局"], + ) + + self._logger.info("所有组件初始化完成") + + def _load_cameras(self): + """加载摄像头配置""" + cameras = self._config_manager.get_cameras() + + for camera in cameras: + try: + self._stream_manager.add_stream( + camera_id=camera.camera_id, + rtsp_url=camera.rtsp_url, + target_fps=self._settings.video_stream.default_fps, + on_frame_callback=self._create_frame_callback(camera.camera_id) + ) + self._logger.info(f"已添加摄像头: {camera.camera_id}") + except Exception as e: + self._logger.error(f"添加摄像头失败 {camera.camera_id}: {e}") + + def _create_frame_callback(self, camera_id: str): + """创建帧处理回调""" + def callback(frame): + self._process_frame(camera_id, frame) + return callback + + def _process_frame(self, camera_id: str, frame: VideoFrame): + """处理视频帧""" + try: + start_time = time.perf_counter() + + roi_configs = self._config_manager.get_roi_configs(camera_id) + + for roi in roi_configs: + self._process_roi_frame(camera_id, frame, roi) + + processing_time_ms = (time.perf_counter() - start_time) * 1000 + + self._performance_stats["total_frames_processed"] += 1 + + self._logger.log_inference_latency( + processing_time_ms, + batch_size=1 + ) + + except Exception as e: + self._logger.error(f"处理帧失败 {camera_id}: {e}") + + def _process_roi_frame( + self, + camera_id: str, + frame: VideoFrame, + roi + ): + """处理ROI帧""" + try: + cropped = self._preprocessor.preprocess_single(frame.image, roi) + + processed_image, scale_info = cropped + + batch_data = self._preprocessor._batch_preprocessor._stack_and_normalize( + [processed_image] + ) + + engine = self._engine_manager.get_engine("default") + if engine is None: + return + + outputs, inference_time_ms = engine.infer(batch_data) + + boxes, scores, class_ids = self._postprocessor.process_detections( + outputs, + conf_threshold=self._settings.inference.conf_threshold + ) + + if len(boxes) > 0: + self._handle_detections( + camera_id, roi.roi_id, frame, + boxes, scores, class_ids, + scale_info + ) + + except Exception as e: + self._logger.error(f"处理ROI帧失败: {e}") + + def _handle_detections( + self, + camera_id: str, + roi_id: str, + frame: VideoFrame, + boxes: any, + scores: any, + class_ids: any, + scale_info: tuple + ): + """处理检测结果""" + try: + from config.config_models import AlgorithmType + + mapped_boxes = self._postprocessor.map_coordinates( + boxes, scale_info, + (frame.width, frame.height) + ) + + for i, box in enumerate(mapped_boxes): + detection_result = { + "class_id": int(class_ids[i]) if len(class_ids) > 0 else 0, + "confidence": float(scores[i]), + "bbox": box, + } + + alert_result = self._postprocessor.check_alarm_condition( + roi_id, True, frame.timestamp + ) + + if alert_result["should_alert"]: + self._performance_stats["total_alerts_generated"] += 1 + + screenshot = frame.image + + self._reporter.report_detection_alert( + camera_id=camera_id, + roi_id=roi_id, + alert_type="detection", + detection={ + "class_name": f"class_{detection_result['class_id']}", + "confidence": detection_result["confidence"], + "bbox": detection_result["bbox"], + "message": f"检测到目标" + }, + screenshot=screenshot + ) + + self._logger.log_alert( + "detection", + camera_id, + roi_id, + detection_result["confidence"] + ) + + except Exception as e: + self._logger.error(f"处理检测结果失败: {e}") + + def start(self): + """启动服务""" + if self._running: + return + + self._running = True + self._stop_event.clear() + + self._load_cameras() + + self._stream_manager.start_all() + + self._logger.info("Edge_Inference_Service 已启动") + + self._start_heartbeat_thread() + + self._register_signal_handlers() + + self._wait_for_shutdown() + + def _start_heartbeat_thread(self): + """启动心跳线程""" + def heartbeat(): + while not self._stop_event.is_set(): + try: + uptime = (datetime.now() - self._performance_stats["start_time"]).total_seconds() + self._performance_stats["uptime_seconds"] = uptime + + status = { + "running": True, + "uptime_seconds": uptime, + "frames_processed": self._performance_stats["total_frames_processed"], + "alerts_generated": self._performance_stats["total_alerts_generated"], + "stream_stats": self._stream_manager.get_statistics() if self._stream_manager else {}, + } + + if self._reporter: + self._reporter.report_heartbeat("edge_inference_device", status) + + except Exception as e: + self._logger.error(f"心跳上报失败: {e}") + + time.sleep(30) + + thread = threading.Thread(target=heartbeat, name="Heartbeat", daemon=True) + thread.start() + + def _register_signal_handlers(self): + """注册信号处理器""" + def handle_signal(signum, frame): + self._logger.info(f"收到信号 {signum}, 正在停止服务...") + self.stop() + + signal.signal(signal.SIGINT, handle_signal) + signal.signal(signal.SIGTERM, handle_signal) + + def _wait_for_shutdown(self): + """等待关闭信号""" + while not self._stop_event.is_set(): + time.sleep(1) + + def stop(self): + """停止服务""" + if not self._running: + return + + self._running = False + self._stop_event.set() + + if self._stream_manager: + self._stream_manager.stop_all() + self._stream_manager.close() + + if self._engine_manager: + self._engine_manager.release_all() + + if self._config_manager: + self._config_manager.stop_config_subscription() + self._config_manager.close() + + if self._reporter: + self._reporter.close() + + self._performance_stats["uptime_seconds"] = ( + (datetime.now() - self._performance_stats["start_time"]).total_seconds() + ) + + self._logger.info("Edge_Inference_Service 已停止") + self._logger.info(f"运行统计: {self._performance_stats}") + + def get_status(self) -> Dict[str, Any]: + """获取服务状态""" + return { + "running": self._running, + "start_time": ( + self._performance_stats["start_time"].isoformat() + if self._performance_stats["start_time"] else None + ), + "uptime_seconds": self._performance_stats["uptime_seconds"], + "total_frames_processed": self._performance_stats["total_frames_processed"], + "total_alerts_generated": self._performance_stats["total_alerts_generated"], + "stream_manager": ( + self._stream_manager.get_statistics() + if self._stream_manager else {} + ), + "config_version": ( + self._config_manager.config_version + if self._config_manager else None + ), + } -# 按装订区域中的绿色按钮以运行脚本。 -if __name__ == '__main__': - print_hi('PyCharm') +def main(): + """主函数入口""" + service = EdgeInferenceService() + + try: + service.initialize() + service.start() + except KeyboardInterrupt: + service.stop() + except Exception as e: + logger.error(f"服务异常: {e}") + raise -# 访问 https://www.jetbrains.com/help/pycharm/ 获取 PyCharm 帮助 + +if __name__ == "__main__": + main() diff --git a/edge_inference_service/models/yolo11n.onnx b/models/yolo11n.onnx similarity index 100% rename from edge_inference_service/models/yolo11n.onnx rename to models/yolo11n.onnx diff --git a/edge_inference_service/models/yolo11n.pt b/models/yolo11n.pt similarity index 100% rename from edge_inference_service/models/yolo11n.pt rename to models/yolo11n.pt diff --git a/edge_inference_service/requirements.txt b/requirements.txt similarity index 100% rename from edge_inference_service/requirements.txt rename to requirements.txt diff --git a/edge_inference_service/tests/__init__.py b/tests/__init__.py similarity index 100% rename from edge_inference_service/tests/__init__.py rename to tests/__init__.py diff --git a/edge_inference_service/tests/test_config_sync.py b/tests/test_config_sync.py similarity index 100% rename from edge_inference_service/tests/test_config_sync.py rename to tests/test_config_sync.py diff --git a/edge_inference_service/tests/test_postprocessor.py b/tests/test_postprocessor.py similarity index 100% rename from edge_inference_service/tests/test_postprocessor.py rename to tests/test_postprocessor.py diff --git a/edge_inference_service/tests/test_preprocessor.py b/tests/test_preprocessor.py similarity index 100% rename from edge_inference_service/tests/test_preprocessor.py rename to tests/test_preprocessor.py diff --git a/edge_inference_service/tests/test_result_reporter.py b/tests/test_result_reporter.py similarity index 100% rename from edge_inference_service/tests/test_result_reporter.py rename to tests/test_result_reporter.py diff --git a/edge_inference_service/tests/test_tensorrt.py b/tests/test_tensorrt.py similarity index 100% rename from edge_inference_service/tests/test_tensorrt.py rename to tests/test_tensorrt.py diff --git a/edge_inference_service/tests/test_utils.py b/tests/test_utils.py similarity index 100% rename from edge_inference_service/tests/test_utils.py rename to tests/test_utils.py diff --git a/edge_inference_service/tests/test_video_stream.py b/tests/test_video_stream.py similarity index 100% rename from edge_inference_service/tests/test_video_stream.py rename to tests/test_video_stream.py diff --git a/edge_inference_service/utils/__init__.py b/utils/__init__.py similarity index 100% rename from edge_inference_service/utils/__init__.py rename to utils/__init__.py diff --git a/edge_inference_service/utils/__pycache__/__init__.cpython-310.pyc b/utils/__pycache__/__init__.cpython-310.pyc similarity index 100% rename from edge_inference_service/utils/__pycache__/__init__.cpython-310.pyc rename to utils/__pycache__/__init__.cpython-310.pyc diff --git a/edge_inference_service/utils/__pycache__/common.cpython-310.pyc b/utils/__pycache__/common.cpython-310.pyc similarity index 100% rename from edge_inference_service/utils/__pycache__/common.cpython-310.pyc rename to utils/__pycache__/common.cpython-310.pyc diff --git a/edge_inference_service/utils/__pycache__/logger.cpython-310.pyc b/utils/__pycache__/logger.cpython-310.pyc similarity index 100% rename from edge_inference_service/utils/__pycache__/logger.cpython-310.pyc rename to utils/__pycache__/logger.cpython-310.pyc diff --git a/edge_inference_service/utils/__pycache__/version_control.cpython-310.pyc b/utils/__pycache__/version_control.cpython-310.pyc similarity index 100% rename from edge_inference_service/utils/__pycache__/version_control.cpython-310.pyc rename to utils/__pycache__/version_control.cpython-310.pyc diff --git a/edge_inference_service/utils/common.py b/utils/common.py similarity index 100% rename from edge_inference_service/utils/common.py rename to utils/common.py diff --git a/edge_inference_service/utils/logger.py b/utils/logger.py similarity index 100% rename from edge_inference_service/utils/logger.py rename to utils/logger.py diff --git a/edge_inference_service/utils/version_control.py b/utils/version_control.py similarity index 100% rename from edge_inference_service/utils/version_control.py rename to utils/version_control.py