""" 主入口模块 整合所有模块,启动推理服务 """ import logging import os import sys import threading import signal import time from datetime import datetime from typing import Dict, Any, Optional, List, Tuple # 禁用系统代理(Clash 等代理工具会干扰 Redis TCP 长连接和 HTTP 请求) for _key in ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"): os.environ.pop(_key, None) from config.settings import get_settings, Settings from core.config_sync import get_config_sync_manager, ConfigSyncManager from core.debug_http_server import start_debug_http_server from core.video_stream import MultiStreamManager, VideoFrame from core.preprocessor import ImagePreprocessor from core.tensorrt_engine import TensorRTEngine, EngineManager from core.postprocessor import PostProcessor from core.result_reporter import ResultReporter from core.alarm_upload_worker import AlarmUploadWorker from core.screenshot_handler import ScreenshotHandler from algorithms import AlgorithmManager from utils.logger import get_logger, StructuredLogger from utils.version_control import get_version_control logger = logging.getLogger(__name__) class EdgeInferenceService: """边缘推理服务主类 整合所有模块,提供完整的推理服务 """ def __init__(self): self._running = False self._settings = get_settings() self._logger = get_logger("main") self._version_control = get_version_control() self._db_manager = None self._config_manager: Optional[ConfigSyncManager] = None self._stream_manager: Optional[MultiStreamManager] = None self._preprocessor: Optional[ImagePreprocessor] = None self._engine_manager: Optional[EngineManager] = None self._postprocessor: Optional[PostProcessor] = None self._reporter: Optional[ResultReporter] = None self._alarm_worker: Optional[AlarmUploadWorker] = None self._screenshot_handler: Optional[ScreenshotHandler] = None self._algorithm_manager: Optional[AlgorithmManager] = None self._debug_reload_thread: Optional[threading.Thread] = None self._debug_http_server = None self._debug_http_thread: Optional[threading.Thread] = None self._heartbeat_thread: Optional[threading.Thread] = None self._processing_threads: Dict[str, threading.Thread] = {} self._stop_event = threading.Event() self._performance_stats = { "start_time": None, "total_frames_processed": 0, "total_alerts_generated": 0, "uptime_seconds": 0, } self._batch_roi_queue: List[tuple] = [] self._batch_lock = threading.Lock() self._batch_event = threading.Event() self._inference_thread: Optional[threading.Thread] = None self._max_batch_size = 8 self._batch_timeout_sec = 0.05 # 50ms 攒批窗口 # 摄像头级别告警去重:同一摄像头+告警类型在冷却期内只上报一次 self._camera_alert_cooldown: Dict[str, datetime] = {} self._camera_cooldown_seconds = 30 # 同摄像头同类型告警最小间隔(秒) # ROI级别告警去重:同ROI+同类型未resolve的告警不重复发送 # key: f"{roi_id}_{alert_type}", value: alarm_id self._active_alarms: Dict[str, str] = {} self._logger.info("Edge_Inference_Service 初始化开始") def _init_database(self): """初始化数据库""" try: from config.database import SQLiteManager self._db_manager = SQLiteManager() if self._db_manager._conn: self._logger.info("数据库初始化成功") else: self._logger.warning("数据库不可见,服务将在无数据库模式下运行") except Exception as e: self._logger.warning(f"数据库初始化失败,服务将在无数据库模式下运行: {e}") self._db_manager = None def _init_config_manager(self): """初始化配置管理器""" try: self._config_manager = get_config_sync_manager() self._config_manager.start_config_subscription() if self._config_manager: def _on_config_update(topic, data): if self._algorithm_manager: # 保留状态地更新参数,避免告警重复 self._algorithm_manager.reload_all_algorithms(preserve_state=True) # 配置更新后清理无ROI的摄像头流 self._cleanup_cameras_without_roi() # 配置更新后动态加载新摄像头流(异步执行,不阻塞HTTP响应) import threading threading.Thread( target=self._reload_cameras, name="CameraReloader", daemon=True ).start() self._config_manager.register_callback("config_update", _on_config_update) self._logger.info("配置管理器初始化成功") except Exception as e: self._logger.error(f"配置管理器初始化失败: {e}") raise def _init_stream_manager(self): """初始化流管理器""" self._stream_manager = MultiStreamManager() self._logger.info("流管理器初始化成功") def _init_preprocessor(self): """初始化预处理器""" self._preprocessor = ImagePreprocessor() self._logger.info("预处理器初始化成功") def _init_engine(self): """初始化推理引擎""" try: self._engine_manager = EngineManager() engine_path = self._settings.inference.model_path if os.path.exists(engine_path): self._engine_manager.load_engine("default", engine_path) self._logger.info(f"推理引擎加载成功: {engine_path}") else: self._logger.warning(f"引擎文件不存在: {engine_path}") except Exception as e: self._logger.error(f"推理引擎初始化失败: {e}") raise def _init_postprocessor(self): """初始化后处理器""" self._postprocessor = PostProcessor() self._logger.info("后处理器初始化成功") def _init_reporter(self): if self._settings.config_sync_mode == "LOCAL" and not self._settings.alarm_upload_enabled: self._logger.info("LOCAL 模式且 ALARM_UPLOAD_ENABLED=0:跳过告警上报组件初始化") return """初始化结果上报器""" try: self._reporter = ResultReporter() self._logger.info("ResultReporter 对象已创建,准备初始化...") self._reporter.initialize() # 初始化 Redis 连接和本地存储 self._logger.info("结果上报器初始化成功") except Exception as e: self._logger.error(f"结果上报器初始化失败: {e}") import traceback self._logger.error(traceback.format_exc()) # 启动告警上报 Worker try: self._alarm_worker = AlarmUploadWorker() self._alarm_worker.start() self._logger.info("告警上报 Worker 启动成功") except Exception as e: self._logger.error(f"告警上报 Worker 启动失败: {e}") import traceback self._logger.error(traceback.format_exc()) def _init_algorithm_manager(self): """初始化算法管理器""" try: self._algorithm_manager = AlgorithmManager() self._algorithm_manager.start_config_subscription() self._logger.info("算法管理器初始化成功") except Exception as e: self._logger.error(f"算法管理器初始化失败: {e}") def _init_screenshot_handler(self): """初始化截图处理器""" try: # 截图处理器必须使用独立的 Redis 连接(不能与 config_sync 共用, # 因为两者都做阻塞 XREAD/XREADGROUP,共用连接会互相干扰) cloud_redis = None try: import redis cfg = self._settings.cloud_redis from core.config_sync import _build_keepalive_options cloud_redis = redis.Redis( host=cfg.host, port=cfg.port, db=cfg.db, password=cfg.password, decode_responses=cfg.decode_responses, socket_connect_timeout=5, socket_timeout=10, retry_on_timeout=True, socket_keepalive=True, socket_keepalive_options=_build_keepalive_options(), health_check_interval=15, ) cloud_redis.ping() self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}") except Exception as e: self._logger.warning(f"截图处理器无法连接云端 Redis: {e}") cloud_redis = None if cloud_redis and self._stream_manager: self._screenshot_handler = ScreenshotHandler( cloud_redis=cloud_redis, stream_manager=self._stream_manager, ) self._screenshot_handler.start() self._logger.info("截图处理器初始化成功") else: self._logger.warning("截图处理器跳过初始化(云端 Redis 不可达或流管理器不可用)") except Exception as e: self._logger.error(f"截图处理器初始化失败: {e}") def _start_debug_reload_watcher(self): """本地调试:监听文件触发同步""" if self._settings.config_sync_mode != "LOCAL": return if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled: return if not self._config_manager: return signal_file = self._settings.debug.reload_signal_file def worker(): last_mtime = None self._logger.info(f"[DEBUG] 本地同步模式已启用,监听: {signal_file}") while not self._stop_event.is_set(): try: if os.path.exists(signal_file): mtime = os.path.getmtime(signal_file) if last_mtime is None: last_mtime = mtime elif mtime != last_mtime: last_mtime = mtime ok = self._config_manager.reload_local_config_from_file() if self._algorithm_manager: self._algorithm_manager.reload_all_algorithms() self._logger.info(f"[DEBUG] 本地配置已重新加载: {ok}") time.sleep(1.0) except Exception as e: self._logger.warning(f"[DEBUG] 监听本地配置失败: {e}") time.sleep(1.0) self._debug_reload_thread = threading.Thread( target=worker, name="LocalConfigReloadWatcher", daemon=True, ) self._debug_reload_thread.start() def _start_debug_http_server(self): """本地调试:启动 HTTP 同步接口""" if self._settings.config_sync_mode != "LOCAL": return if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled: return if self._debug_http_server is not None: return host = self._settings.debug.host port = self._settings.debug.port self._debug_http_server = start_debug_http_server(host, port) def worker(): try: self._debug_http_server.serve_forever() except Exception as e: self._logger.warning(f"[DEBUG] HTTP 服务器异常: {e}") self._debug_http_thread = threading.Thread( target=worker, name="DebugHttpServer", daemon=True, ) self._debug_http_thread.start() def _start_heartbeat(self): """启动心跳守护线程,每 30 秒向云端上报设备状态""" def worker(): import requests base_url = self._settings.alarm_upload.cloud_api_url.rstrip("/") wvp_url = self._settings.alarm_upload.wvp_api_url.rstrip("/") if self._settings.alarm_upload.wvp_api_url else "" urls = [f"{base_url}/api/ai/device/heartbeat"] if wvp_url: urls.append(f"{wvp_url}/api/ai/device/heartbeat") device_id = self._settings.mqtt.device_id self._logger.info(f"[心跳] 守护线程已启动, 目标: {urls}, device_id={device_id}") while not self._stop_event.is_set(): try: start_time = self._performance_stats.get("start_time") uptime = (datetime.now() - start_time).total_seconds() if start_time else 0 stream_count = len(self._stream_manager._streams) if self._stream_manager else 0 config_version = self._config_manager.config_version if self._config_manager else None payload = { "device_id": device_id, "status": { "uptime_seconds": int(uptime), "frames_processed": self._performance_stats.get("total_frames_processed", 0), "alerts_generated": self._performance_stats.get("total_alerts_generated", 0), "stream_count": stream_count, "config_version": config_version, "stream_stats": { "active_streams": stream_count, }, }, } for url in urls: try: resp = requests.post(url, json=payload, timeout=10, proxies={"http": None, "https": None}) if resp.status_code == 200: self._logger.debug(f"[心跳] {url} 上报成功") else: self._logger.warning(f"[心跳] {url} 上报失败: HTTP {resp.status_code}") except Exception as e: self._logger.warning(f"[心跳] {url} 上报异常: {e}") except Exception as e: self._logger.warning(f"[心跳] 上报异常: {e}") self._stop_event.wait(30) self._heartbeat_thread = threading.Thread( target=worker, name="HeartbeatWorker", daemon=True, ) self._heartbeat_thread.start() def initialize(self): """初始化所有组件""" self._logger.info("=" * 50) self._logger.info("Edge_Inference_Service 启动") self._logger.info("=" * 50) self._init_database() self._init_config_manager() self._init_stream_manager() self._init_preprocessor() self._init_engine() self._init_postprocessor() self._init_reporter() self._init_algorithm_manager() self._init_screenshot_handler() self._start_debug_reload_watcher() self._start_debug_http_server() self._start_heartbeat() self._performance_stats["start_time"] = datetime.now() self._version_control.record_update( version="1.0.0", update_type="启动", description="Edge_Inference_Service 启动运行", updated_by="系统", affected_items=["全局"], ) self._logger.info("所有组件初始化完成") def _get_camera_area_id(self, camera_id: str) -> Optional[int]: """获取摄像头的 area_id""" cam = self._get_camera_config_by_id(camera_id) return cam.area_id if cam else None def _get_camera_ids_with_roi(self) -> set: """获取有ROI配置的摄像头ID集合 Returns: set: 有ROI配置的摄像头ID集合 """ try: all_rois = self._config_manager.get_roi_configs(force_refresh=True) camera_ids = {roi.camera_id for roi in all_rois if roi.camera_id} return camera_ids except Exception as e: self._logger.error(f"获取ROI配置失败: {e}") return set() def _get_camera_config_by_id(self, camera_id: str): """根据摄像头ID获取配置 Args: camera_id: 摄像头ID Returns: CameraInfoModel or None: 摄像头配置对象,未找到返回None """ try: cameras = self._config_manager.get_cameras() for camera in cameras: if camera.camera_id == camera_id: return camera return None except Exception as e: self._logger.error(f"获取摄像头配置失败 {camera_id}: {e}") return None def _load_cameras(self): """加载摄像头配置 - 只启动有ROI配置的摄像头 逻辑: 1. 获取所有ROI配置,提取关联的摄像头ID 2. 只为有ROI配置的摄像头启动视频流 3. 避免启动无用的视频流,节省资源 """ # 获取有ROI配置的摄像头ID camera_ids_with_roi = self._get_camera_ids_with_roi() if not camera_ids_with_roi: self._logger.warning("未找到任何ROI配置,不启动视频流") return self._logger.info(f"检测到 {len(camera_ids_with_roi)} 个摄像头有ROI配置") # 只启动有ROI的摄像头 success_count = 0 failed_count = 0 for camera_id in camera_ids_with_roi: camera = self._get_camera_config_by_id(camera_id) if not camera: self._logger.warning(f"摄像头 {camera_id} 有ROI但未找到配置,跳过") failed_count += 1 continue if not camera.rtsp_url: self._logger.warning(f"摄像头 {camera_id} 缺少 rtsp_url,跳过") failed_count += 1 continue try: self._stream_manager.add_stream( camera_id=camera.camera_id, rtsp_url=camera.rtsp_url, target_fps=self._settings.video_stream.default_fps, on_frame_callback=self._create_frame_callback(camera.camera_id) ) self._logger.info(f"已添加摄像头: {camera.camera_id}") success_count += 1 except Exception as e: self._logger.error(f"添加摄像头失败 {camera.camera_id}: {e}") failed_count += 1 self._logger.info( f"摄像头加载完成 - 成功: {success_count}, 失败: {failed_count}, " f"总计: {len(camera_ids_with_roi)}" ) def _reload_cameras(self): """配置更新后动态加载新摄像头 - 只添加有ROI配置且未启动的摄像头 逻辑: 1. 获取当前有ROI配置的摄像头ID 2. 过滤出尚未启动的摄像头 3. 动态添加并启动新摄像头流 """ if not self._stream_manager or not self._config_manager: return try: # 获取有ROI配置的摄像头ID camera_ids_with_roi = self._get_camera_ids_with_roi() if not camera_ids_with_roi: self._logger.debug("配置更新后未找到有ROI配置的摄像头") return # 获取已启动的摄像头 existing_streams = set(self._stream_manager._streams.keys()) # 找出需要新增的摄像头(有ROI但未启动) new_camera_ids = camera_ids_with_roi - existing_streams if not new_camera_ids: self._logger.debug("配置更新后无需新增摄像头流") return self._logger.info(f"检测到 {len(new_camera_ids)} 个新摄像头需要启动") # 动态添加新摄像头 success_count = 0 failed_count = 0 for camera_id in new_camera_ids: camera = self._get_camera_config_by_id(camera_id) if not camera: self._logger.warning(f"摄像头 {camera_id} 有ROI但未找到配置,跳过") failed_count += 1 continue if not camera.rtsp_url: self._logger.warning(f"摄像头 {camera_id} 缺少 rtsp_url,跳过") failed_count += 1 continue try: self._stream_manager.add_stream( camera_id=camera.camera_id, rtsp_url=camera.rtsp_url, target_fps=self._settings.video_stream.default_fps, on_frame_callback=self._create_frame_callback(camera.camera_id) ) # 立即启动新添加的流 self._stream_manager._streams[camera.camera_id].start() success_count += 1 self._logger.info(f"动态添加并启动摄像头: {camera.camera_id}") except Exception as e: self._logger.error(f"动态添加摄像头失败 {camera.camera_id}: {e}") failed_count += 1 if success_count > 0: self._logger.info( f"配置更新后新增摄像头完成 - 成功: {success_count}, 失败: {failed_count}" ) except Exception as e: self._logger.error(f"动态加载摄像头失败: {e}") def _cleanup_cameras_without_roi(self): """清理没有ROI配置的摄像头视频流 逻辑: 1. 获取当前有ROI配置的摄像头ID 2. 找出已启动但没有ROI的摄像头 3. 停止并移除这些摄像头的视频流,节省资源 """ if not self._stream_manager or not self._config_manager: return try: # 获取有ROI配置的摄像头ID camera_ids_with_roi = self._get_camera_ids_with_roi() # 获取已启动的摄像头 running_streams = set(self._stream_manager._streams.keys()) # 找出需要停止的摄像头(已启动但没有ROI) cameras_to_remove = running_streams - camera_ids_with_roi if not cameras_to_remove: self._logger.debug("无需清理摄像头视频流") return self._logger.info(f"检测到 {len(cameras_to_remove)} 个摄像头无ROI配置,需要停止") # 停止并移除这些摄像头的视频流 removed_count = 0 for camera_id in cameras_to_remove: try: self._stream_manager.remove_stream(camera_id) removed_count += 1 self._logger.info(f"已停止并移除摄像头流: {camera_id} (无ROI配置)") except Exception as e: self._logger.error(f"移除摄像头流失败 {camera_id}: {e}") if removed_count > 0: self._logger.info(f"清理完成 - 已移除 {removed_count} 个无ROI的摄像头流") except Exception as e: self._logger.error(f"清理摄像头流失败: {e}") def _create_frame_callback(self, camera_id: str): """创建帧处理回调""" def callback(frame): self._process_frame(camera_id, frame) return callback def _process_frame(self, camera_id: str, frame: VideoFrame): """处理视频帧 - 批量处理多 ROI""" try: roi_configs = self._config_manager.get_roi_configs_with_bindings(camera_id) # 每100帧打印一次状态(非告警诊断日志,使用 DEBUG 级别) if self._performance_stats["total_frames_processed"] % 100 == 0: self._logger.debug(f"[{camera_id}] 已处理 {self._performance_stats['total_frames_processed']} 帧, ROI数: {len(roi_configs)}") roi_items = [] for roi in roi_configs: if not roi.enabled: continue if not roi.bindings: continue for bind in roi.bindings: if not bind.enabled: continue try: cropped, scale_info = self._preprocessor.preprocess_single( frame.image, roi ) roi_items.append((camera_id, roi, bind, frame, cropped, scale_info)) except Exception as e: self._logger.error(f"预处理 ROI 失败 {roi.roi_id}: {e}") if not roi_items: return with self._batch_lock: self._batch_roi_queue.extend(roi_items) # 通知推理线程有新数据 self._batch_event.set() self._performance_stats["total_frames_processed"] += 1 except Exception as e: self._logger.error(f"处理帧失败 {camera_id}: {e}") def _batch_process_rois(self): """批量处理 ROI - 真正的 batch 推理(按 max_batch_size 分块)""" with self._batch_lock: roi_items = self._batch_roi_queue if not roi_items: return self._batch_roi_queue = [] engine = self._engine_manager.get_engine("default") if engine is None: return # 按 max_batch_size 分块处理 for chunk_start in range(0, len(roi_items), self._max_batch_size): chunk = roi_items[chunk_start:chunk_start + self._max_batch_size] try: images = [item[4] for item in chunk] scale_infos = [item[5] for item in chunk] # 真正的 batch: 将所有 ROI 裁剪图拼成 [N,3,H,W] 一次推理 batch_data, _ = self._preprocessor._batch_preprocessor.preprocess_batch( images ) # 一次性推理整个 batch outputs, inference_time_ms = engine.infer(batch_data) # 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别) import numpy as np if isinstance(outputs, np.ndarray): self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms") elif isinstance(outputs, (list, tuple)): shapes = [o.shape if hasattr(o, 'shape') else type(o) for o in outputs] self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms") batch_size = len(chunk) # 按算法类型获取每个 item 的独立置信度阈值 per_item_conf = [ self._settings.inference.get_conf_threshold(item[2].algo_code) for item in chunk ] batch_results = self._postprocessor.batch_process_detections( outputs, batch_size, per_item_conf_thresholds=per_item_conf, ) total_detections = sum(len(r[0]) for r in batch_results) self._logger.debug( f"[推理] batch_size={batch_size}, 总检测数={total_detections}, " f"conf_thresholds={per_item_conf}" ) for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(chunk): boxes, scores, class_ids = batch_results[idx] # 无论是否检测到目标都要调用算法(离岗检测需要"无人"信号) self._handle_detections( camera_id, roi, bind, frame, boxes, scores, class_ids, scale_info ) except Exception as e: self._logger.error(f"批量处理 ROI 失败: {e}") def _process_roi_frame(self, camera_id: str, frame: VideoFrame, roi): """收集 ROI 帧数据 - 已集成到 _process_frame 中""" pass def _build_tracks( self, roi, boxes: any, scores: any, class_ids: any, scale_info: tuple ) -> list: """将检测结果转换为算法所需的 tracks 格式 坐标从 letterbox 空间还原到 ROI 裁剪空间 """ tracks = [] class_names = getattr(self._settings, 'class_names', ['person']) # 将 letterbox 坐标还原到 ROI 裁剪空间 reverted_boxes = self._preprocessor.revert_boxes( [box.tolist() if hasattr(box, 'tolist') else list(box) for box in boxes], scale_info ) for i, box in enumerate(reverted_boxes): class_id = int(class_ids[i]) if class_ids[i] else 0 track = { "track_id": f"{roi.roi_id}_{i}", "class": class_names[class_id] if class_id < len(class_names) else f"class_{class_id}", "confidence": float(scores[i]), "bbox": box, "matched_rois": [{"roi_id": roi.roi_id}], } tracks.append(track) return tracks def _handle_detections( self, camera_id: str, roi, bind, frame: VideoFrame, boxes: any, scores: any, class_ids: any, scale_info: tuple ): """处理检测结果 - 算法接管判断权""" try: if self._algorithm_manager is None: self._logger.warning("算法管理器不可用,跳过算法处理") return if self._reporter is None: self._logger.debug("ResultReporter 未启用,跳过告警上报") return roi_id = roi.roi_id algo_code = bind.algo_code algo_params = bind.params or {} # 诊断日志:检测到目标(非告警诊断日志,使用 DEBUG 级别) if len(boxes) > 0: self._logger.debug(f"[{camera_id}] ROI={roi_id[:8]} 检测到 {len(boxes)} 个目标, algo={algo_code}") self._algorithm_manager.register_algorithm( roi_id=roi_id, bind_id=bind.bind_id, algorithm_type=algo_code, params=algo_params ) tracks = self._build_tracks(roi, boxes, scores, class_ids, scale_info) # 离岗检测需要"无人"信号,不能因为 tracks 为空就跳过算法 # 诊断日志:tracks 内容(非告警诊断日志,使用 DEBUG 级别) self._logger.debug(f"[{camera_id}] tracks: {[t.get('class') for t in tracks]}, target_class={bind.target_class}") alerts = self._algorithm_manager.process( roi_id=roi_id, bind_id=bind.bind_id, camera_id=camera_id, algorithm_type=algo_code, tracks=tracks, current_time=frame.timestamp ) # 诊断日志:算法处理结果 if alerts: self._logger.info(f"[{camera_id}] 算法 {algo_code} 返回 {len(alerts)} 个告警") else: # 获取算法状态用于诊断(非告警诊断日志,使用 DEBUG 级别) algo_status = self._algorithm_manager.get_status(roi_id) self._logger.debug(f"[{camera_id}] 算法 {algo_code} 无告警, 状态: {algo_status}") for alert in alerts: alert_type = alert.get("alert_type", "detection") # resolve 事件:更新已有告警,不创建新告警 if alert_type == "alarm_resolve": resolve_alarm_id = alert.get("resolve_alarm_id") resolve_data = { "alarm_id": resolve_alarm_id, "duration_ms": alert.get("duration_ms"), "last_frame_time": alert.get("last_frame_time"), "resolve_type": alert.get("resolve_type"), } if self._reporter: self._reporter.report_alarm_resolve(resolve_data) # 清除活跃告警记录,允许后续新告警 for k, v in list(self._active_alarms.items()): if v == resolve_alarm_id: del self._active_alarms[k] self._logger.debug(f"[去重] 活跃告警已清除: {k} -> {resolve_alarm_id}") break self._logger.info( f"告警已结束: alarm_id={resolve_data['alarm_id']}, " f"duration_ms={resolve_data['duration_ms']}, " f"reason={resolve_data['resolve_type']}" ) continue # ROI级别去重:同ROI+同类型有未resolve的告警时不重复发送 active_key = f"{roi_id}_{alert_type}" if active_key in self._active_alarms: self._logger.debug( f"[去重] 跳过告警: roi={roi_id}, type={alert_type}, " f"存在未结束告警={self._active_alarms[active_key]}" ) continue # 摄像头级别去重:同一摄像头+告警类型在冷却期内只上报一次 dedup_key = f"{camera_id}_{alert_type}" now = frame.timestamp last_alert_time = self._camera_alert_cooldown.get(dedup_key) if last_alert_time is not None: elapsed = (now - last_alert_time).total_seconds() if elapsed < self._camera_cooldown_seconds: self._logger.debug( f"[去重] 跳过告警: camera={camera_id}, type={alert_type}, " f"roi={roi_id}, 距上次={elapsed:.1f}s < {self._camera_cooldown_seconds}s" ) continue self._camera_alert_cooldown[dedup_key] = now self._performance_stats["total_alerts_generated"] += 1 # 获取算法的事件开始时间(泛化:支持所有算法类型) first_frame_time = None algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type) if algo: # 各算法使用不同的内部变量名存储开始时间 for attr in ('_leave_start_time', '_parking_start_time', '_congestion_start_time', '_intrusion_start_time'): val = getattr(algo, attr, None) if val: first_frame_time = val.isoformat() break from core.result_reporter import AlarmInfo, generate_alarm_id alarm_info = AlarmInfo( alarm_id=generate_alarm_id(self._settings.mqtt.device_id), alarm_type=alert_type, device_id=camera_id, scene_id=roi_id, event_time=frame.timestamp.isoformat(), alarm_level=alert.get("alarm_level", 2), algorithm_code=algo_code, confidence_score=alert.get("confidence", 1.0), ext_data={ "duration_ms": int(alert.get("duration_minutes", 0) * 60 * 1000) if alert.get("duration_minutes") else None, "roi_id": roi_id, "bbox": alert.get("bbox", []), "target_class": alert.get("class", bind.target_class or "unknown"), "bind_id": bind.bind_id, "message": alert.get("message", ""), "edge_node_id": self._settings.mqtt.device_id, "first_frame_time": first_frame_time, "vehicle_count": alert.get("vehicle_count"), "area_id": self._get_camera_area_id(camera_id), }, ) self._reporter.report_alarm(alarm_info, screenshot=frame.image) # 记录活跃告警(用于 ROI 级去重) self._active_alarms[active_key] = alarm_info.alarm_id # 回填 alarm_id 到算法实例(用于后续 resolve 追踪,泛化支持所有算法类型) algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type) if algo and hasattr(algo, 'set_last_alarm_id'): algo.set_last_alarm_id(alarm_info.alarm_id) self._logger.info( f"告警已生成: type={alert_type}, " f"camera={camera_id}, roi={roi_id}, " f"confidence={alert.get('confidence', 1.0)}" ) except Exception as e: self._logger.error(f"处理检测结果失败: {e}") def _inference_worker(self): """推理线程:攒批窗口内收集 ROI 请求,批量推理""" while not self._stop_event.is_set(): # 等待有新数据到达或超时 self._batch_event.wait(timeout=self._batch_timeout_sec) self._batch_event.clear() with self._batch_lock: queue_size = len(self._batch_roi_queue) # 攒批窗口:等到攒够 max_batch 或超时后处理 if queue_size > 0 and queue_size < self._max_batch_size: # 再等一小段时间凑更多 self._batch_event.wait(timeout=self._batch_timeout_sec) self._batch_event.clear() with self._batch_lock: if not self._batch_roi_queue: continue self._batch_process_rois() def start(self): """启动服务""" if self._running: return self._running = True self._stop_event.clear() self._load_cameras() # 启动独立推理线程(生产者-消费者模式) self._inference_thread = threading.Thread( target=self._inference_worker, name="InferenceWorker", daemon=True ) self._inference_thread.start() self._logger.info("推理线程已启动") self._stream_manager.start_all() self._logger.info("Edge_Inference_Service 已启动") self._register_signal_handlers() self._wait_for_shutdown() def _register_signal_handlers(self): """注册信号处理器""" def handle_signal(signum, frame): self._logger.info(f"收到信号 {signum}, 正在停止服务...") self.stop() signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) def _wait_for_shutdown(self): """等待关闭信号""" while not self._stop_event.is_set(): time.sleep(1) def stop(self): """停止服务""" if not self._running: return self._running = False self._stop_event.set() self._batch_event.set() # 唤醒推理线程以退出 if self._inference_thread and self._inference_thread.is_alive(): self._inference_thread.join(timeout=5) if self._stream_manager: self._stream_manager.stop_all() self._stream_manager.close() if self._engine_manager: self._engine_manager.release_all() if self._config_manager: self._config_manager.stop_config_subscription() self._config_manager.close() if self._algorithm_manager: self._algorithm_manager.stop_config_subscription() if self._alarm_worker: self._alarm_worker.stop() if self._screenshot_handler: self._screenshot_handler.stop() if self._reporter: self._reporter.close() if self._debug_http_server: try: self._debug_http_server.shutdown() except Exception: pass self._performance_stats["uptime_seconds"] = ( (datetime.now() - self._performance_stats["start_time"]).total_seconds() ) self._logger.info("Edge_Inference_Service 已停止") self._logger.info(f"运行统计: {self._performance_stats}") def get_status(self) -> Dict[str, Any]: """获取服务状态""" return { "running": self._running, "start_time": ( self._performance_stats["start_time"].isoformat() if self._performance_stats["start_time"] else None ), "uptime_seconds": self._performance_stats["uptime_seconds"], "total_frames_processed": self._performance_stats["total_frames_processed"], "total_alerts_generated": self._performance_stats["total_alerts_generated"], "stream_manager": ( self._stream_manager.get_statistics() if self._stream_manager else {} ), "config_version": ( self._config_manager.config_version if self._config_manager else None ), } def main(): """主函数入口""" service = EdgeInferenceService() try: service.initialize() service.start() except KeyboardInterrupt: service.stop() except Exception as e: logger.error(f"服务异常: {e}") raise if __name__ == "__main__": main()