Files
security-ai-edge/main.py

895 lines
34 KiB
Python
Raw Normal View History

"""
主入口模块
整合所有模块启动推理服务
"""
2026-01-29 18:33:12 +08:00
import logging
import os
import sys
import threading
import signal
import time
from datetime import datetime
from typing import Dict, Any, Optional, List, Tuple
2026-01-29 18:33:12 +08:00
from config.settings import get_settings, Settings
from core.config_sync import get_config_sync_manager, ConfigSyncManager
from core.debug_http_server import start_debug_http_server
from core.video_stream import MultiStreamManager, VideoFrame
from core.preprocessor import ImagePreprocessor
from core.tensorrt_engine import TensorRTEngine, EngineManager
from core.postprocessor import PostProcessor
from core.result_reporter import ResultReporter
from core.alarm_upload_worker import AlarmUploadWorker
from algorithms import AlgorithmManager
from utils.logger import get_logger, StructuredLogger
from utils.version_control import get_version_control
2026-01-29 18:33:12 +08:00
logger = logging.getLogger(__name__)
2026-01-29 18:33:12 +08:00
class EdgeInferenceService:
"""边缘推理服务主类
整合所有模块提供完整的推理服务
"""
def __init__(self):
self._running = False
self._settings = get_settings()
self._logger = get_logger("main")
self._version_control = get_version_control()
self._db_manager = None
self._config_manager: Optional[ConfigSyncManager] = None
self._stream_manager: Optional[MultiStreamManager] = None
self._preprocessor: Optional[ImagePreprocessor] = None
self._engine_manager: Optional[EngineManager] = None
self._postprocessor: Optional[PostProcessor] = None
self._reporter: Optional[ResultReporter] = None
self._alarm_worker: Optional[AlarmUploadWorker] = None
self._algorithm_manager: Optional[AlgorithmManager] = None
self._debug_reload_thread: Optional[threading.Thread] = None
self._debug_http_server = None
self._debug_http_thread: Optional[threading.Thread] = None
self._processing_threads: Dict[str, threading.Thread] = {}
self._stop_event = threading.Event()
self._performance_stats = {
"start_time": None,
"total_frames_processed": 0,
"total_alerts_generated": 0,
"uptime_seconds": 0,
}
self._batch_roi_queue: List[tuple] = []
self._batch_lock = threading.Lock()
self._batch_event = threading.Event()
self._inference_thread: Optional[threading.Thread] = None
self._max_batch_size = 8
self._batch_timeout_sec = 0.05 # 50ms 攒批窗口
# 摄像头级别告警去重:同一摄像头+告警类型在冷却期内只上报一次
self._camera_alert_cooldown: Dict[str, datetime] = {}
self._camera_cooldown_seconds = 30 # 同摄像头同类型告警最小间隔(秒)
self._logger.info("Edge_Inference_Service 初始化开始")
def _init_database(self):
"""初始化数据库"""
try:
from config.database import SQLiteManager
self._db_manager = SQLiteManager()
if self._db_manager._conn:
self._logger.info("数据库初始化成功")
else:
self._logger.warning("数据库不可见,服务将在无数据库模式下运行")
except Exception as e:
self._logger.warning(f"数据库初始化失败,服务将在无数据库模式下运行: {e}")
self._db_manager = None
def _init_config_manager(self):
"""初始化配置管理器"""
try:
self._config_manager = get_config_sync_manager()
self._config_manager.start_config_subscription()
if self._settings.config_sync_mode == "LOCAL" and self._config_manager:
def _on_config_update(topic, data):
if self._algorithm_manager:
self._algorithm_manager.reload_all_algorithms()
# 配置更新后清理无ROI的摄像头流
self._cleanup_cameras_without_roi()
# 配置更新后动态加载新摄像头流有ROI的
self._reload_cameras()
self._config_manager.register_callback("config_update", _on_config_update)
self._logger.info("配置管理器初始化成功")
except Exception as e:
self._logger.error(f"配置管理器初始化失败: {e}")
raise
def _init_stream_manager(self):
"""初始化流管理器"""
self._stream_manager = MultiStreamManager()
self._logger.info("流管理器初始化成功")
def _init_preprocessor(self):
"""初始化预处理器"""
self._preprocessor = ImagePreprocessor()
self._logger.info("预处理器初始化成功")
def _init_engine(self):
"""初始化推理引擎"""
try:
self._engine_manager = EngineManager()
engine_path = self._settings.inference.model_path
if os.path.exists(engine_path):
self._engine_manager.load_engine("default", engine_path)
self._logger.info(f"推理引擎加载成功: {engine_path}")
else:
self._logger.warning(f"引擎文件不存在: {engine_path}")
except Exception as e:
self._logger.error(f"推理引擎初始化失败: {e}")
raise
def _init_postprocessor(self):
"""初始化后处理器"""
self._postprocessor = PostProcessor()
self._logger.info("后处理器初始化成功")
def _init_reporter(self):
if self._settings.config_sync_mode == "LOCAL" and not self._settings.alarm_upload_enabled:
self._logger.info("LOCAL 模式且 ALARM_UPLOAD_ENABLED=0跳过告警上报组件初始化")
return
"""初始化结果上报器"""
try:
self._reporter = ResultReporter()
self._logger.info("ResultReporter 对象已创建,准备初始化...")
self._reporter.initialize() # 初始化 Redis 连接和本地存储
self._logger.info("结果上报器初始化成功")
except Exception as e:
self._logger.error(f"结果上报器初始化失败: {e}")
import traceback
self._logger.error(traceback.format_exc())
# 启动告警上报 Worker
try:
self._alarm_worker = AlarmUploadWorker()
self._alarm_worker.start()
self._logger.info("告警上报 Worker 启动成功")
except Exception as e:
self._logger.error(f"告警上报 Worker 启动失败: {e}")
import traceback
self._logger.error(traceback.format_exc())
def _init_algorithm_manager(self):
"""初始化算法管理器"""
try:
self._algorithm_manager = AlgorithmManager()
self._algorithm_manager.start_config_subscription()
self._logger.info("算法管理器初始化成功")
except Exception as e:
self._logger.error(f"算法管理器初始化失败: {e}")
def _start_debug_reload_watcher(self):
"""本地调试:监听文件触发同步"""
if self._settings.config_sync_mode != "LOCAL":
return
if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled:
return
if not self._config_manager:
return
signal_file = self._settings.debug.reload_signal_file
def worker():
last_mtime = None
self._logger.info(f"[DEBUG] 本地同步模式已启用,监听: {signal_file}")
while not self._stop_event.is_set():
try:
if os.path.exists(signal_file):
mtime = os.path.getmtime(signal_file)
if last_mtime is None:
last_mtime = mtime
elif mtime != last_mtime:
last_mtime = mtime
ok = self._config_manager.reload_local_config_from_file()
if self._algorithm_manager:
self._algorithm_manager.reload_all_algorithms()
self._logger.info(f"[DEBUG] 本地配置已重新加载: {ok}")
time.sleep(1.0)
except Exception as e:
self._logger.warning(f"[DEBUG] 监听本地配置失败: {e}")
time.sleep(1.0)
self._debug_reload_thread = threading.Thread(
target=worker,
name="LocalConfigReloadWatcher",
daemon=True,
)
self._debug_reload_thread.start()
def _start_debug_http_server(self):
"""本地调试:启动 HTTP 同步接口"""
if self._settings.config_sync_mode != "LOCAL":
return
if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled:
return
if self._debug_http_server is not None:
return
host = self._settings.debug.host
port = self._settings.debug.port
self._debug_http_server = start_debug_http_server(host, port)
def worker():
try:
self._debug_http_server.serve_forever()
except Exception as e:
self._logger.warning(f"[DEBUG] HTTP 服务器异常: {e}")
self._debug_http_thread = threading.Thread(
target=worker,
name="DebugHttpServer",
daemon=True,
)
self._debug_http_thread.start()
def initialize(self):
"""初始化所有组件"""
self._logger.info("=" * 50)
self._logger.info("Edge_Inference_Service 启动")
self._logger.info("=" * 50)
self._init_database()
self._init_config_manager()
self._init_stream_manager()
self._init_preprocessor()
self._init_engine()
self._init_postprocessor()
self._init_reporter()
self._init_algorithm_manager()
self._start_debug_reload_watcher()
self._start_debug_http_server()
self._performance_stats["start_time"] = datetime.now()
self._version_control.record_update(
version="1.0.0",
update_type="启动",
description="Edge_Inference_Service 启动运行",
updated_by="系统",
affected_items=["全局"],
)
self._logger.info("所有组件初始化完成")
def _get_camera_ids_with_roi(self) -> set:
"""获取有ROI配置的摄像头ID集合
Returns:
set: 有ROI配置的摄像头ID集合
"""
try:
all_rois = self._config_manager.get_roi_configs(force_refresh=True)
camera_ids = {roi.camera_id for roi in all_rois if roi.camera_id}
return camera_ids
except Exception as e:
self._logger.error(f"获取ROI配置失败: {e}")
return set()
def _get_camera_config_by_id(self, camera_id: str):
"""根据摄像头ID获取配置
Args:
camera_id: 摄像头ID
Returns:
CameraInfoModel or None: 摄像头配置对象未找到返回None
"""
try:
cameras = self._config_manager.get_cameras()
for camera in cameras:
if camera.camera_id == camera_id:
return camera
return None
except Exception as e:
self._logger.error(f"获取摄像头配置失败 {camera_id}: {e}")
return None
def _load_cameras(self):
"""加载摄像头配置 - 只启动有ROI配置的摄像头
逻辑
1. 获取所有ROI配置提取关联的摄像头ID
2. 只为有ROI配置的摄像头启动视频流
3. 避免启动无用的视频流节省资源
"""
# 获取有ROI配置的摄像头ID
camera_ids_with_roi = self._get_camera_ids_with_roi()
if not camera_ids_with_roi:
self._logger.warning("未找到任何ROI配置不启动视频流")
return
self._logger.info(f"检测到 {len(camera_ids_with_roi)} 个摄像头有ROI配置")
# 只启动有ROI的摄像头
success_count = 0
failed_count = 0
for camera_id in camera_ids_with_roi:
camera = self._get_camera_config_by_id(camera_id)
if not camera:
self._logger.warning(f"摄像头 {camera_id} 有ROI但未找到配置跳过")
failed_count += 1
continue
if not camera.rtsp_url:
self._logger.warning(f"摄像头 {camera_id} 缺少 rtsp_url跳过")
failed_count += 1
continue
try:
self._stream_manager.add_stream(
camera_id=camera.camera_id,
rtsp_url=camera.rtsp_url,
target_fps=self._settings.video_stream.default_fps,
on_frame_callback=self._create_frame_callback(camera.camera_id)
)
self._logger.info(f"已添加摄像头: {camera.camera_id}")
success_count += 1
except Exception as e:
self._logger.error(f"添加摄像头失败 {camera.camera_id}: {e}")
failed_count += 1
self._logger.info(
f"摄像头加载完成 - 成功: {success_count}, 失败: {failed_count}, "
f"总计: {len(camera_ids_with_roi)}"
)
def _reload_cameras(self):
"""配置更新后动态加载新摄像头 - 只添加有ROI配置且未启动的摄像头
逻辑
1. 获取当前有ROI配置的摄像头ID
2. 过滤出尚未启动的摄像头
3. 动态添加并启动新摄像头流
"""
if not self._stream_manager or not self._config_manager:
return
try:
# 获取有ROI配置的摄像头ID
camera_ids_with_roi = self._get_camera_ids_with_roi()
if not camera_ids_with_roi:
self._logger.debug("配置更新后未找到有ROI配置的摄像头")
return
# 获取已启动的摄像头
existing_streams = set(self._stream_manager._streams.keys())
# 找出需要新增的摄像头有ROI但未启动
new_camera_ids = camera_ids_with_roi - existing_streams
if not new_camera_ids:
self._logger.debug("配置更新后无需新增摄像头流")
return
self._logger.info(f"检测到 {len(new_camera_ids)} 个新摄像头需要启动")
# 动态添加新摄像头
success_count = 0
failed_count = 0
for camera_id in new_camera_ids:
camera = self._get_camera_config_by_id(camera_id)
if not camera:
self._logger.warning(f"摄像头 {camera_id} 有ROI但未找到配置跳过")
failed_count += 1
continue
if not camera.rtsp_url:
self._logger.warning(f"摄像头 {camera_id} 缺少 rtsp_url跳过")
failed_count += 1
continue
try:
self._stream_manager.add_stream(
camera_id=camera.camera_id,
rtsp_url=camera.rtsp_url,
target_fps=self._settings.video_stream.default_fps,
on_frame_callback=self._create_frame_callback(camera.camera_id)
)
# 立即启动新添加的流
self._stream_manager._streams[camera.camera_id].start()
success_count += 1
self._logger.info(f"动态添加并启动摄像头: {camera.camera_id}")
except Exception as e:
self._logger.error(f"动态添加摄像头失败 {camera.camera_id}: {e}")
failed_count += 1
if success_count > 0:
self._logger.info(
f"配置更新后新增摄像头完成 - 成功: {success_count}, 失败: {failed_count}"
)
except Exception as e:
self._logger.error(f"动态加载摄像头失败: {e}")
def _cleanup_cameras_without_roi(self):
"""清理没有ROI配置的摄像头视频流
逻辑
1. 获取当前有ROI配置的摄像头ID
2. 找出已启动但没有ROI的摄像头
3. 停止并移除这些摄像头的视频流节省资源
"""
if not self._stream_manager or not self._config_manager:
return
try:
# 获取有ROI配置的摄像头ID
camera_ids_with_roi = self._get_camera_ids_with_roi()
# 获取已启动的摄像头
running_streams = set(self._stream_manager._streams.keys())
# 找出需要停止的摄像头已启动但没有ROI
cameras_to_remove = running_streams - camera_ids_with_roi
if not cameras_to_remove:
self._logger.debug("无需清理摄像头视频流")
return
self._logger.info(f"检测到 {len(cameras_to_remove)} 个摄像头无ROI配置需要停止")
# 停止并移除这些摄像头的视频流
removed_count = 0
for camera_id in cameras_to_remove:
try:
self._stream_manager.remove_stream(camera_id)
removed_count += 1
self._logger.info(f"已停止并移除摄像头流: {camera_id} (无ROI配置)")
except Exception as e:
self._logger.error(f"移除摄像头流失败 {camera_id}: {e}")
if removed_count > 0:
self._logger.info(f"清理完成 - 已移除 {removed_count} 个无ROI的摄像头流")
except Exception as e:
self._logger.error(f"清理摄像头流失败: {e}")
def _create_frame_callback(self, camera_id: str):
"""创建帧处理回调"""
def callback(frame):
self._process_frame(camera_id, frame)
return callback
def _process_frame(self, camera_id: str, frame: VideoFrame):
"""处理视频帧 - 批量处理多 ROI"""
try:
roi_configs = self._config_manager.get_roi_configs_with_bindings(camera_id)
# 每100帧打印一次状态非告警诊断日志使用 DEBUG 级别)
if self._performance_stats["total_frames_processed"] % 100 == 0:
self._logger.debug(f"[{camera_id}] 已处理 {self._performance_stats['total_frames_processed']} 帧, ROI数: {len(roi_configs)}")
roi_items = []
for roi in roi_configs:
if not roi.enabled:
continue
if not roi.bindings:
continue
for bind in roi.bindings:
if not bind.enabled:
continue
try:
cropped, scale_info = self._preprocessor.preprocess_single(
frame.image, roi
)
roi_items.append((camera_id, roi, bind, frame, cropped, scale_info))
except Exception as e:
self._logger.error(f"预处理 ROI 失败 {roi.roi_id}: {e}")
if not roi_items:
return
with self._batch_lock:
self._batch_roi_queue.extend(roi_items)
# 通知推理线程有新数据
self._batch_event.set()
self._performance_stats["total_frames_processed"] += 1
except Exception as e:
self._logger.error(f"处理帧失败 {camera_id}: {e}")
def _batch_process_rois(self):
"""批量处理 ROI - 真正的 batch 推理"""
with self._batch_lock:
roi_items = self._batch_roi_queue
if not roi_items:
return
self._batch_roi_queue = []
try:
images = [item[4] for item in roi_items]
scale_infos = [item[5] for item in roi_items]
# 真正的 batch: 将所有 ROI 裁剪图拼成 [N,3,H,W] 一次推理
batch_data, _ = self._preprocessor._batch_preprocessor.preprocess_batch(
images
)
engine = self._engine_manager.get_engine("default")
if engine is None:
return
# 一次性推理整个 batch
outputs, inference_time_ms = engine.infer(batch_data)
# 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别)
import numpy as np
if isinstance(outputs, np.ndarray):
self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms")
elif isinstance(outputs, (list, tuple)):
shapes = [o.shape if hasattr(o, 'shape') else type(o) for o in outputs]
self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms")
batch_size = len(roi_items)
batch_results = self._postprocessor.batch_process_detections(
outputs,
batch_size,
conf_threshold=self._settings.inference.conf_threshold
)
total_detections = sum(len(r[0]) for r in batch_results)
self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}")
for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(roi_items):
boxes, scores, class_ids = batch_results[idx]
# 无论是否检测到目标都要调用算法(离岗检测需要"无人"信号)
self._handle_detections(
camera_id, roi, bind, frame,
boxes, scores, class_ids,
scale_info
)
except Exception as e:
self._logger.error(f"批量处理 ROI 失败: {e}")
def _process_roi_frame(self, camera_id: str, frame: VideoFrame, roi):
"""收集 ROI 帧数据 - 已集成到 _process_frame 中"""
pass
def _build_tracks(
self,
roi,
boxes: any,
scores: any,
class_ids: any,
scale_info: tuple
) -> list:
"""将检测结果转换为算法所需的 tracks 格式
坐标从 letterbox 空间还原到 ROI 裁剪空间
"""
tracks = []
class_names = getattr(self._settings, 'class_names', ['person'])
# 将 letterbox 坐标还原到 ROI 裁剪空间
reverted_boxes = self._preprocessor.revert_boxes(
[box.tolist() if hasattr(box, 'tolist') else list(box) for box in boxes],
scale_info
)
for i, box in enumerate(reverted_boxes):
class_id = int(class_ids[i]) if class_ids[i] else 0
track = {
"track_id": f"{roi.roi_id}_{i}",
"class": class_names[class_id] if class_id < len(class_names) else f"class_{class_id}",
"confidence": float(scores[i]),
"bbox": box,
"matched_rois": [{"roi_id": roi.roi_id}],
}
tracks.append(track)
return tracks
def _handle_detections(
self,
camera_id: str,
roi,
bind,
frame: VideoFrame,
boxes: any,
scores: any,
class_ids: any,
scale_info: tuple
):
"""处理检测结果 - 算法接管判断权"""
try:
if self._algorithm_manager is None:
self._logger.warning("算法管理器不可用,跳过算法处理")
return
if self._reporter is None:
self._logger.debug("ResultReporter 未启用,跳过告警上报")
return
roi_id = roi.roi_id
algo_code = bind.algo_code
algo_params = bind.params or {}
# 诊断日志:检测到目标(非告警诊断日志,使用 DEBUG 级别)
if len(boxes) > 0:
self._logger.debug(f"[{camera_id}] ROI={roi_id[:8]} 检测到 {len(boxes)} 个目标, algo={algo_code}")
self._algorithm_manager.register_algorithm(
roi_id=roi_id,
bind_id=bind.bind_id,
algorithm_type=algo_code,
params=algo_params
)
tracks = self._build_tracks(roi, boxes, scores, class_ids, scale_info)
# 离岗检测需要"无人"信号,不能因为 tracks 为空就跳过算法
# 诊断日志tracks 内容(非告警诊断日志,使用 DEBUG 级别)
self._logger.debug(f"[{camera_id}] tracks: {[t.get('class') for t in tracks]}, target_class={bind.target_class}")
alerts = self._algorithm_manager.process(
roi_id=roi_id,
bind_id=bind.bind_id,
camera_id=camera_id,
algorithm_type=algo_code,
tracks=tracks,
current_time=frame.timestamp
)
# 诊断日志:算法处理结果
if alerts:
self._logger.info(f"[{camera_id}] 算法 {algo_code} 返回 {len(alerts)} 个告警")
else:
# 获取算法状态用于诊断(非告警诊断日志,使用 DEBUG 级别)
algo_status = self._algorithm_manager.get_status(roi_id)
self._logger.debug(f"[{camera_id}] 算法 {algo_code} 无告警, 状态: {algo_status}")
for alert in alerts:
alert_type = alert.get("alert_type", "detection")
# resolve 事件:更新已有告警,不创建新告警
if alert_type == "alarm_resolve":
resolve_data = {
"alarm_id": alert.get("resolve_alarm_id"),
"duration_ms": alert.get("duration_ms"),
"last_frame_time": alert.get("last_frame_time"),
"resolve_type": alert.get("resolve_type"),
}
if self._reporter:
self._reporter.report_alarm_resolve(resolve_data)
self._logger.info(
f"离岗告警结束: alarm_id={resolve_data['alarm_id']}, "
f"duration_ms={resolve_data['duration_ms']}, "
f"reason={resolve_data['resolve_type']}"
)
continue
# 摄像头级别去重:同一摄像头+告警类型在冷却期内只上报一次
dedup_key = f"{camera_id}_{alert_type}"
now = frame.timestamp
last_alert_time = self._camera_alert_cooldown.get(dedup_key)
if last_alert_time is not None:
elapsed = (now - last_alert_time).total_seconds()
if elapsed < self._camera_cooldown_seconds:
self._logger.debug(
f"[去重] 跳过告警: camera={camera_id}, type={alert_type}, "
f"roi={roi_id}, 距上次={elapsed:.1f}s < {self._camera_cooldown_seconds}s"
)
continue
self._camera_alert_cooldown[dedup_key] = now
self._performance_stats["total_alerts_generated"] += 1
# 获取算法的离岗开始时间
leave_start_time = None
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, '_leave_start_time') and algo._leave_start_time:
leave_start_time = algo._leave_start_time.isoformat()
from core.result_reporter import AlarmInfo, generate_alarm_id
alarm_info = AlarmInfo(
alarm_id=generate_alarm_id(self._settings.mqtt.device_id),
alarm_type=alert_type,
device_id=camera_id,
scene_id=roi_id,
event_time=frame.timestamp.isoformat(),
alarm_level=alert.get("alarm_level", 2),
algorithm_code=algo_code,
confidence_score=alert.get("confidence", 1.0),
ext_data={
"duration_ms": int(alert.get("duration_minutes", 0) * 60 * 1000) if alert.get("duration_minutes") else None,
"roi_id": roi_id,
"bbox": alert.get("bbox", []),
"target_class": alert.get("class", bind.target_class or "unknown"),
"bind_id": bind.bind_id,
"message": alert.get("message", ""),
"edge_node_id": self._settings.mqtt.device_id,
"first_frame_time": leave_start_time,
},
)
self._reporter.report_alarm(alarm_info, screenshot=frame.image)
# 回填 alarm_id 到算法实例(用于后续 resolve 追踪)
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
self._logger.info(
f"告警已生成: type={alert_type}, "
f"camera={camera_id}, roi={roi_id}, "
f"confidence={alert.get('confidence', 1.0)}"
)
except Exception as e:
self._logger.error(f"处理检测结果失败: {e}")
def _inference_worker(self):
"""推理线程:攒批窗口内收集 ROI 请求,批量推理"""
while not self._stop_event.is_set():
# 等待有新数据到达或超时
self._batch_event.wait(timeout=self._batch_timeout_sec)
self._batch_event.clear()
with self._batch_lock:
queue_size = len(self._batch_roi_queue)
# 攒批窗口:等到攒够 max_batch 或超时后处理
if queue_size > 0 and queue_size < self._max_batch_size:
# 再等一小段时间凑更多
self._batch_event.wait(timeout=self._batch_timeout_sec)
self._batch_event.clear()
with self._batch_lock:
if not self._batch_roi_queue:
continue
self._batch_process_rois()
def start(self):
"""启动服务"""
if self._running:
return
self._running = True
self._stop_event.clear()
self._load_cameras()
# 启动独立推理线程(生产者-消费者模式)
self._inference_thread = threading.Thread(
target=self._inference_worker,
name="InferenceWorker",
daemon=True
)
self._inference_thread.start()
self._logger.info("推理线程已启动")
self._stream_manager.start_all()
self._logger.info("Edge_Inference_Service 已启动")
self._register_signal_handlers()
self._wait_for_shutdown()
def _register_signal_handlers(self):
"""注册信号处理器"""
def handle_signal(signum, frame):
self._logger.info(f"收到信号 {signum}, 正在停止服务...")
self.stop()
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
def _wait_for_shutdown(self):
"""等待关闭信号"""
while not self._stop_event.is_set():
time.sleep(1)
def stop(self):
"""停止服务"""
if not self._running:
return
self._running = False
self._stop_event.set()
self._batch_event.set() # 唤醒推理线程以退出
if self._inference_thread and self._inference_thread.is_alive():
self._inference_thread.join(timeout=5)
if self._stream_manager:
self._stream_manager.stop_all()
self._stream_manager.close()
if self._engine_manager:
self._engine_manager.release_all()
if self._config_manager:
self._config_manager.stop_config_subscription()
self._config_manager.close()
if self._algorithm_manager:
self._algorithm_manager.stop_config_subscription()
if self._alarm_worker:
self._alarm_worker.stop()
if self._reporter:
self._reporter.close()
if self._debug_http_server:
try:
self._debug_http_server.shutdown()
except Exception:
pass
self._performance_stats["uptime_seconds"] = (
(datetime.now() - self._performance_stats["start_time"]).total_seconds()
)
self._logger.info("Edge_Inference_Service 已停止")
self._logger.info(f"运行统计: {self._performance_stats}")
def get_status(self) -> Dict[str, Any]:
"""获取服务状态"""
return {
"running": self._running,
"start_time": (
self._performance_stats["start_time"].isoformat()
if self._performance_stats["start_time"] else None
),
"uptime_seconds": self._performance_stats["uptime_seconds"],
"total_frames_processed": self._performance_stats["total_frames_processed"],
"total_alerts_generated": self._performance_stats["total_alerts_generated"],
"stream_manager": (
self._stream_manager.get_statistics()
if self._stream_manager else {}
),
"config_version": (
self._config_manager.config_version
if self._config_manager else None
),
}
2026-01-29 18:33:12 +08:00
def main():
"""主函数入口"""
service = EdgeInferenceService()
try:
service.initialize()
service.start()
except KeyboardInterrupt:
service.stop()
except Exception as e:
logger.error(f"服务异常: {e}")
raise
if __name__ == "__main__":
main()