Files
security-ai-edge/main.py
16337 0b0e793785 fix(edge): 配置更新时保留算法状态,避免重复告警
- 新增 update_algorithm_params() 方法,仅更新参数不重置状态机
- 修改 reload_all_algorithms() 支持 preserve_state 参数
  - preserve_state=True: 保留状态机,仅更新参数(配置更新)
  - preserve_state=False: 完全重置(手动重启)
- 修改 main.py 配置更新回调使用 preserve_state=True

修复问题:配置更新导致算法状态机重置,周界入侵CONFIRMING_CLEAR状态丢失,重新生成新告警
现在配置更新时保留告警状态,不会产生重复告警

支持算法:
- leave_post: 更新 leave_countdown_sec, working_hours 等参数
- intrusion: 更新 confirm_intrusion_seconds, confirm_clear_seconds 等参数

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 11:24:41 +08:00

905 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
主入口模块
整合所有模块,启动推理服务
"""
import logging
import os
import sys
import threading
import signal
import time
from datetime import datetime
from typing import Dict, Any, Optional, List, Tuple
from config.settings import get_settings, Settings
from core.config_sync import get_config_sync_manager, ConfigSyncManager
from core.debug_http_server import start_debug_http_server
from core.video_stream import MultiStreamManager, VideoFrame
from core.preprocessor import ImagePreprocessor
from core.tensorrt_engine import TensorRTEngine, EngineManager
from core.postprocessor import PostProcessor
from core.result_reporter import ResultReporter
from core.alarm_upload_worker import AlarmUploadWorker
from algorithms import AlgorithmManager
from utils.logger import get_logger, StructuredLogger
from utils.version_control import get_version_control
logger = logging.getLogger(__name__)
class EdgeInferenceService:
"""边缘推理服务主类
整合所有模块,提供完整的推理服务
"""
def __init__(self):
self._running = False
self._settings = get_settings()
self._logger = get_logger("main")
self._version_control = get_version_control()
self._db_manager = None
self._config_manager: Optional[ConfigSyncManager] = None
self._stream_manager: Optional[MultiStreamManager] = None
self._preprocessor: Optional[ImagePreprocessor] = None
self._engine_manager: Optional[EngineManager] = None
self._postprocessor: Optional[PostProcessor] = None
self._reporter: Optional[ResultReporter] = None
self._alarm_worker: Optional[AlarmUploadWorker] = None
self._algorithm_manager: Optional[AlgorithmManager] = None
self._debug_reload_thread: Optional[threading.Thread] = None
self._debug_http_server = None
self._debug_http_thread: Optional[threading.Thread] = None
self._processing_threads: Dict[str, threading.Thread] = {}
self._stop_event = threading.Event()
self._performance_stats = {
"start_time": None,
"total_frames_processed": 0,
"total_alerts_generated": 0,
"uptime_seconds": 0,
}
self._batch_roi_queue: List[tuple] = []
self._batch_lock = threading.Lock()
self._batch_event = threading.Event()
self._inference_thread: Optional[threading.Thread] = None
self._max_batch_size = 8
self._batch_timeout_sec = 0.05 # 50ms 攒批窗口
# 摄像头级别告警去重:同一摄像头+告警类型在冷却期内只上报一次
self._camera_alert_cooldown: Dict[str, datetime] = {}
self._camera_cooldown_seconds = 30 # 同摄像头同类型告警最小间隔(秒)
self._logger.info("Edge_Inference_Service 初始化开始")
def _init_database(self):
"""初始化数据库"""
try:
from config.database import SQLiteManager
self._db_manager = SQLiteManager()
if self._db_manager._conn:
self._logger.info("数据库初始化成功")
else:
self._logger.warning("数据库不可见,服务将在无数据库模式下运行")
except Exception as e:
self._logger.warning(f"数据库初始化失败,服务将在无数据库模式下运行: {e}")
self._db_manager = None
def _init_config_manager(self):
"""初始化配置管理器"""
try:
self._config_manager = get_config_sync_manager()
self._config_manager.start_config_subscription()
if self._settings.config_sync_mode == "LOCAL" and self._config_manager:
def _on_config_update(topic, data):
if self._algorithm_manager:
# 保留状态地更新参数,避免告警重复
self._algorithm_manager.reload_all_algorithms(preserve_state=True)
# 配置更新后清理无ROI的摄像头流
self._cleanup_cameras_without_roi()
# 配置更新后动态加载新摄像头流异步执行不阻塞HTTP响应
import threading
threading.Thread(
target=self._reload_cameras,
name="CameraReloader",
daemon=True
).start()
self._config_manager.register_callback("config_update", _on_config_update)
self._logger.info("配置管理器初始化成功")
except Exception as e:
self._logger.error(f"配置管理器初始化失败: {e}")
raise
def _init_stream_manager(self):
"""初始化流管理器"""
self._stream_manager = MultiStreamManager()
self._logger.info("流管理器初始化成功")
def _init_preprocessor(self):
"""初始化预处理器"""
self._preprocessor = ImagePreprocessor()
self._logger.info("预处理器初始化成功")
def _init_engine(self):
"""初始化推理引擎"""
try:
self._engine_manager = EngineManager()
engine_path = self._settings.inference.model_path
if os.path.exists(engine_path):
self._engine_manager.load_engine("default", engine_path)
self._logger.info(f"推理引擎加载成功: {engine_path}")
else:
self._logger.warning(f"引擎文件不存在: {engine_path}")
except Exception as e:
self._logger.error(f"推理引擎初始化失败: {e}")
raise
def _init_postprocessor(self):
"""初始化后处理器"""
self._postprocessor = PostProcessor()
self._logger.info("后处理器初始化成功")
def _init_reporter(self):
if self._settings.config_sync_mode == "LOCAL" and not self._settings.alarm_upload_enabled:
self._logger.info("LOCAL 模式且 ALARM_UPLOAD_ENABLED=0跳过告警上报组件初始化")
return
"""初始化结果上报器"""
try:
self._reporter = ResultReporter()
self._logger.info("ResultReporter 对象已创建,准备初始化...")
self._reporter.initialize() # 初始化 Redis 连接和本地存储
self._logger.info("结果上报器初始化成功")
except Exception as e:
self._logger.error(f"结果上报器初始化失败: {e}")
import traceback
self._logger.error(traceback.format_exc())
# 启动告警上报 Worker
try:
self._alarm_worker = AlarmUploadWorker()
self._alarm_worker.start()
self._logger.info("告警上报 Worker 启动成功")
except Exception as e:
self._logger.error(f"告警上报 Worker 启动失败: {e}")
import traceback
self._logger.error(traceback.format_exc())
def _init_algorithm_manager(self):
"""初始化算法管理器"""
try:
self._algorithm_manager = AlgorithmManager()
self._algorithm_manager.start_config_subscription()
self._logger.info("算法管理器初始化成功")
except Exception as e:
self._logger.error(f"算法管理器初始化失败: {e}")
def _start_debug_reload_watcher(self):
"""本地调试:监听文件触发同步"""
if self._settings.config_sync_mode != "LOCAL":
return
if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled:
return
if not self._config_manager:
return
signal_file = self._settings.debug.reload_signal_file
def worker():
last_mtime = None
self._logger.info(f"[DEBUG] 本地同步模式已启用,监听: {signal_file}")
while not self._stop_event.is_set():
try:
if os.path.exists(signal_file):
mtime = os.path.getmtime(signal_file)
if last_mtime is None:
last_mtime = mtime
elif mtime != last_mtime:
last_mtime = mtime
ok = self._config_manager.reload_local_config_from_file()
if self._algorithm_manager:
self._algorithm_manager.reload_all_algorithms()
self._logger.info(f"[DEBUG] 本地配置已重新加载: {ok}")
time.sleep(1.0)
except Exception as e:
self._logger.warning(f"[DEBUG] 监听本地配置失败: {e}")
time.sleep(1.0)
self._debug_reload_thread = threading.Thread(
target=worker,
name="LocalConfigReloadWatcher",
daemon=True,
)
self._debug_reload_thread.start()
def _start_debug_http_server(self):
"""本地调试:启动 HTTP 同步接口"""
if self._settings.config_sync_mode != "LOCAL":
return
if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled:
return
if self._debug_http_server is not None:
return
host = self._settings.debug.host
port = self._settings.debug.port
self._debug_http_server = start_debug_http_server(host, port)
def worker():
try:
self._debug_http_server.serve_forever()
except Exception as e:
self._logger.warning(f"[DEBUG] HTTP 服务器异常: {e}")
self._debug_http_thread = threading.Thread(
target=worker,
name="DebugHttpServer",
daemon=True,
)
self._debug_http_thread.start()
def initialize(self):
"""初始化所有组件"""
self._logger.info("=" * 50)
self._logger.info("Edge_Inference_Service 启动")
self._logger.info("=" * 50)
self._init_database()
self._init_config_manager()
self._init_stream_manager()
self._init_preprocessor()
self._init_engine()
self._init_postprocessor()
self._init_reporter()
self._init_algorithm_manager()
self._start_debug_reload_watcher()
self._start_debug_http_server()
self._performance_stats["start_time"] = datetime.now()
self._version_control.record_update(
version="1.0.0",
update_type="启动",
description="Edge_Inference_Service 启动运行",
updated_by="系统",
affected_items=["全局"],
)
self._logger.info("所有组件初始化完成")
def _get_camera_ids_with_roi(self) -> set:
"""获取有ROI配置的摄像头ID集合
Returns:
set: 有ROI配置的摄像头ID集合
"""
try:
all_rois = self._config_manager.get_roi_configs(force_refresh=True)
camera_ids = {roi.camera_id for roi in all_rois if roi.camera_id}
return camera_ids
except Exception as e:
self._logger.error(f"获取ROI配置失败: {e}")
return set()
def _get_camera_config_by_id(self, camera_id: str):
"""根据摄像头ID获取配置
Args:
camera_id: 摄像头ID
Returns:
CameraInfoModel or None: 摄像头配置对象未找到返回None
"""
try:
cameras = self._config_manager.get_cameras()
for camera in cameras:
if camera.camera_id == camera_id:
return camera
return None
except Exception as e:
self._logger.error(f"获取摄像头配置失败 {camera_id}: {e}")
return None
def _load_cameras(self):
"""加载摄像头配置 - 只启动有ROI配置的摄像头
逻辑:
1. 获取所有ROI配置提取关联的摄像头ID
2. 只为有ROI配置的摄像头启动视频流
3. 避免启动无用的视频流,节省资源
"""
# 获取有ROI配置的摄像头ID
camera_ids_with_roi = self._get_camera_ids_with_roi()
if not camera_ids_with_roi:
self._logger.warning("未找到任何ROI配置不启动视频流")
return
self._logger.info(f"检测到 {len(camera_ids_with_roi)} 个摄像头有ROI配置")
# 只启动有ROI的摄像头
success_count = 0
failed_count = 0
for camera_id in camera_ids_with_roi:
camera = self._get_camera_config_by_id(camera_id)
if not camera:
self._logger.warning(f"摄像头 {camera_id} 有ROI但未找到配置跳过")
failed_count += 1
continue
if not camera.rtsp_url:
self._logger.warning(f"摄像头 {camera_id} 缺少 rtsp_url跳过")
failed_count += 1
continue
try:
self._stream_manager.add_stream(
camera_id=camera.camera_id,
rtsp_url=camera.rtsp_url,
target_fps=self._settings.video_stream.default_fps,
on_frame_callback=self._create_frame_callback(camera.camera_id)
)
self._logger.info(f"已添加摄像头: {camera.camera_id}")
success_count += 1
except Exception as e:
self._logger.error(f"添加摄像头失败 {camera.camera_id}: {e}")
failed_count += 1
self._logger.info(
f"摄像头加载完成 - 成功: {success_count}, 失败: {failed_count}, "
f"总计: {len(camera_ids_with_roi)}"
)
def _reload_cameras(self):
"""配置更新后动态加载新摄像头 - 只添加有ROI配置且未启动的摄像头
逻辑:
1. 获取当前有ROI配置的摄像头ID
2. 过滤出尚未启动的摄像头
3. 动态添加并启动新摄像头流
"""
if not self._stream_manager or not self._config_manager:
return
try:
# 获取有ROI配置的摄像头ID
camera_ids_with_roi = self._get_camera_ids_with_roi()
if not camera_ids_with_roi:
self._logger.debug("配置更新后未找到有ROI配置的摄像头")
return
# 获取已启动的摄像头
existing_streams = set(self._stream_manager._streams.keys())
# 找出需要新增的摄像头有ROI但未启动
new_camera_ids = camera_ids_with_roi - existing_streams
if not new_camera_ids:
self._logger.debug("配置更新后无需新增摄像头流")
return
self._logger.info(f"检测到 {len(new_camera_ids)} 个新摄像头需要启动")
# 动态添加新摄像头
success_count = 0
failed_count = 0
for camera_id in new_camera_ids:
camera = self._get_camera_config_by_id(camera_id)
if not camera:
self._logger.warning(f"摄像头 {camera_id} 有ROI但未找到配置跳过")
failed_count += 1
continue
if not camera.rtsp_url:
self._logger.warning(f"摄像头 {camera_id} 缺少 rtsp_url跳过")
failed_count += 1
continue
try:
self._stream_manager.add_stream(
camera_id=camera.camera_id,
rtsp_url=camera.rtsp_url,
target_fps=self._settings.video_stream.default_fps,
on_frame_callback=self._create_frame_callback(camera.camera_id)
)
# 立即启动新添加的流
self._stream_manager._streams[camera.camera_id].start()
success_count += 1
self._logger.info(f"动态添加并启动摄像头: {camera.camera_id}")
except Exception as e:
self._logger.error(f"动态添加摄像头失败 {camera.camera_id}: {e}")
failed_count += 1
if success_count > 0:
self._logger.info(
f"配置更新后新增摄像头完成 - 成功: {success_count}, 失败: {failed_count}"
)
except Exception as e:
self._logger.error(f"动态加载摄像头失败: {e}")
def _cleanup_cameras_without_roi(self):
"""清理没有ROI配置的摄像头视频流
逻辑:
1. 获取当前有ROI配置的摄像头ID
2. 找出已启动但没有ROI的摄像头
3. 停止并移除这些摄像头的视频流,节省资源
"""
if not self._stream_manager or not self._config_manager:
return
try:
# 获取有ROI配置的摄像头ID
camera_ids_with_roi = self._get_camera_ids_with_roi()
# 获取已启动的摄像头
running_streams = set(self._stream_manager._streams.keys())
# 找出需要停止的摄像头已启动但没有ROI
cameras_to_remove = running_streams - camera_ids_with_roi
if not cameras_to_remove:
self._logger.debug("无需清理摄像头视频流")
return
self._logger.info(f"检测到 {len(cameras_to_remove)} 个摄像头无ROI配置需要停止")
# 停止并移除这些摄像头的视频流
removed_count = 0
for camera_id in cameras_to_remove:
try:
self._stream_manager.remove_stream(camera_id)
removed_count += 1
self._logger.info(f"已停止并移除摄像头流: {camera_id} (无ROI配置)")
except Exception as e:
self._logger.error(f"移除摄像头流失败 {camera_id}: {e}")
if removed_count > 0:
self._logger.info(f"清理完成 - 已移除 {removed_count} 个无ROI的摄像头流")
except Exception as e:
self._logger.error(f"清理摄像头流失败: {e}")
def _create_frame_callback(self, camera_id: str):
"""创建帧处理回调"""
def callback(frame):
self._process_frame(camera_id, frame)
return callback
def _process_frame(self, camera_id: str, frame: VideoFrame):
"""处理视频帧 - 批量处理多 ROI"""
try:
roi_configs = self._config_manager.get_roi_configs_with_bindings(camera_id)
# 每100帧打印一次状态非告警诊断日志使用 DEBUG 级别)
if self._performance_stats["total_frames_processed"] % 100 == 0:
self._logger.debug(f"[{camera_id}] 已处理 {self._performance_stats['total_frames_processed']} 帧, ROI数: {len(roi_configs)}")
roi_items = []
for roi in roi_configs:
if not roi.enabled:
continue
if not roi.bindings:
continue
for bind in roi.bindings:
if not bind.enabled:
continue
try:
cropped, scale_info = self._preprocessor.preprocess_single(
frame.image, roi
)
roi_items.append((camera_id, roi, bind, frame, cropped, scale_info))
except Exception as e:
self._logger.error(f"预处理 ROI 失败 {roi.roi_id}: {e}")
if not roi_items:
return
with self._batch_lock:
self._batch_roi_queue.extend(roi_items)
# 通知推理线程有新数据
self._batch_event.set()
self._performance_stats["total_frames_processed"] += 1
except Exception as e:
self._logger.error(f"处理帧失败 {camera_id}: {e}")
def _batch_process_rois(self):
"""批量处理 ROI - 真正的 batch 推理"""
with self._batch_lock:
roi_items = self._batch_roi_queue
if not roi_items:
return
self._batch_roi_queue = []
try:
images = [item[4] for item in roi_items]
scale_infos = [item[5] for item in roi_items]
# 真正的 batch: 将所有 ROI 裁剪图拼成 [N,3,H,W] 一次推理
batch_data, _ = self._preprocessor._batch_preprocessor.preprocess_batch(
images
)
engine = self._engine_manager.get_engine("default")
if engine is None:
return
# 一次性推理整个 batch
outputs, inference_time_ms = engine.infer(batch_data)
# 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别)
import numpy as np
if isinstance(outputs, np.ndarray):
self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms")
elif isinstance(outputs, (list, tuple)):
shapes = [o.shape if hasattr(o, 'shape') else type(o) for o in outputs]
self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms")
batch_size = len(roi_items)
batch_results = self._postprocessor.batch_process_detections(
outputs,
batch_size,
conf_threshold=self._settings.inference.conf_threshold
)
total_detections = sum(len(r[0]) for r in batch_results)
self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}")
for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(roi_items):
boxes, scores, class_ids = batch_results[idx]
# 无论是否检测到目标都要调用算法(离岗检测需要"无人"信号)
self._handle_detections(
camera_id, roi, bind, frame,
boxes, scores, class_ids,
scale_info
)
except Exception as e:
self._logger.error(f"批量处理 ROI 失败: {e}")
def _process_roi_frame(self, camera_id: str, frame: VideoFrame, roi):
"""收集 ROI 帧数据 - 已集成到 _process_frame 中"""
pass
def _build_tracks(
self,
roi,
boxes: any,
scores: any,
class_ids: any,
scale_info: tuple
) -> list:
"""将检测结果转换为算法所需的 tracks 格式
坐标从 letterbox 空间还原到 ROI 裁剪空间
"""
tracks = []
class_names = getattr(self._settings, 'class_names', ['person'])
# 将 letterbox 坐标还原到 ROI 裁剪空间
reverted_boxes = self._preprocessor.revert_boxes(
[box.tolist() if hasattr(box, 'tolist') else list(box) for box in boxes],
scale_info
)
for i, box in enumerate(reverted_boxes):
class_id = int(class_ids[i]) if class_ids[i] else 0
track = {
"track_id": f"{roi.roi_id}_{i}",
"class": class_names[class_id] if class_id < len(class_names) else f"class_{class_id}",
"confidence": float(scores[i]),
"bbox": box,
"matched_rois": [{"roi_id": roi.roi_id}],
}
tracks.append(track)
return tracks
def _handle_detections(
self,
camera_id: str,
roi,
bind,
frame: VideoFrame,
boxes: any,
scores: any,
class_ids: any,
scale_info: tuple
):
"""处理检测结果 - 算法接管判断权"""
try:
if self._algorithm_manager is None:
self._logger.warning("算法管理器不可用,跳过算法处理")
return
if self._reporter is None:
self._logger.debug("ResultReporter 未启用,跳过告警上报")
return
roi_id = roi.roi_id
algo_code = bind.algo_code
algo_params = bind.params or {}
# 诊断日志:检测到目标(非告警诊断日志,使用 DEBUG 级别)
if len(boxes) > 0:
self._logger.debug(f"[{camera_id}] ROI={roi_id[:8]} 检测到 {len(boxes)} 个目标, algo={algo_code}")
self._algorithm_manager.register_algorithm(
roi_id=roi_id,
bind_id=bind.bind_id,
algorithm_type=algo_code,
params=algo_params
)
tracks = self._build_tracks(roi, boxes, scores, class_ids, scale_info)
# 离岗检测需要"无人"信号,不能因为 tracks 为空就跳过算法
# 诊断日志tracks 内容(非告警诊断日志,使用 DEBUG 级别)
self._logger.debug(f"[{camera_id}] tracks: {[t.get('class') for t in tracks]}, target_class={bind.target_class}")
alerts = self._algorithm_manager.process(
roi_id=roi_id,
bind_id=bind.bind_id,
camera_id=camera_id,
algorithm_type=algo_code,
tracks=tracks,
current_time=frame.timestamp
)
# 诊断日志:算法处理结果
if alerts:
self._logger.info(f"[{camera_id}] 算法 {algo_code} 返回 {len(alerts)} 个告警")
else:
# 获取算法状态用于诊断(非告警诊断日志,使用 DEBUG 级别)
algo_status = self._algorithm_manager.get_status(roi_id)
self._logger.debug(f"[{camera_id}] 算法 {algo_code} 无告警, 状态: {algo_status}")
for alert in alerts:
alert_type = alert.get("alert_type", "detection")
# resolve 事件:更新已有告警,不创建新告警
if alert_type == "alarm_resolve":
resolve_data = {
"alarm_id": alert.get("resolve_alarm_id"),
"duration_ms": alert.get("duration_ms"),
"last_frame_time": alert.get("last_frame_time"),
"resolve_type": alert.get("resolve_type"),
}
if self._reporter:
self._reporter.report_alarm_resolve(resolve_data)
self._logger.info(
f"离岗告警结束: alarm_id={resolve_data['alarm_id']}, "
f"duration_ms={resolve_data['duration_ms']}, "
f"reason={resolve_data['resolve_type']}"
)
continue
# 摄像头级别去重:同一摄像头+告警类型在冷却期内只上报一次
dedup_key = f"{camera_id}_{alert_type}"
now = frame.timestamp
last_alert_time = self._camera_alert_cooldown.get(dedup_key)
if last_alert_time is not None:
elapsed = (now - last_alert_time).total_seconds()
if elapsed < self._camera_cooldown_seconds:
self._logger.debug(
f"[去重] 跳过告警: camera={camera_id}, type={alert_type}, "
f"roi={roi_id}, 距上次={elapsed:.1f}s < {self._camera_cooldown_seconds}s"
)
continue
self._camera_alert_cooldown[dedup_key] = now
self._performance_stats["total_alerts_generated"] += 1
# 获取算法的离岗开始时间
leave_start_time = None
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, '_leave_start_time') and algo._leave_start_time:
leave_start_time = algo._leave_start_time.isoformat()
from core.result_reporter import AlarmInfo, generate_alarm_id
alarm_info = AlarmInfo(
alarm_id=generate_alarm_id(self._settings.mqtt.device_id),
alarm_type=alert_type,
device_id=camera_id,
scene_id=roi_id,
event_time=frame.timestamp.isoformat(),
alarm_level=alert.get("alarm_level", 2),
algorithm_code=algo_code,
confidence_score=alert.get("confidence", 1.0),
ext_data={
"duration_ms": int(alert.get("duration_minutes", 0) * 60 * 1000) if alert.get("duration_minutes") else None,
"roi_id": roi_id,
"bbox": alert.get("bbox", []),
"target_class": alert.get("class", bind.target_class or "unknown"),
"bind_id": bind.bind_id,
"message": alert.get("message", ""),
"edge_node_id": self._settings.mqtt.device_id,
"first_frame_time": leave_start_time,
},
)
self._reporter.report_alarm(alarm_info, screenshot=frame.image)
# 回填 alarm_id 到算法实例(用于后续 resolve 追踪)
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
elif alert_type == "intrusion":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("intrusion")
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
self._logger.info(
f"告警已生成: type={alert_type}, "
f"camera={camera_id}, roi={roi_id}, "
f"confidence={alert.get('confidence', 1.0)}"
)
except Exception as e:
self._logger.error(f"处理检测结果失败: {e}")
def _inference_worker(self):
"""推理线程:攒批窗口内收集 ROI 请求,批量推理"""
while not self._stop_event.is_set():
# 等待有新数据到达或超时
self._batch_event.wait(timeout=self._batch_timeout_sec)
self._batch_event.clear()
with self._batch_lock:
queue_size = len(self._batch_roi_queue)
# 攒批窗口:等到攒够 max_batch 或超时后处理
if queue_size > 0 and queue_size < self._max_batch_size:
# 再等一小段时间凑更多
self._batch_event.wait(timeout=self._batch_timeout_sec)
self._batch_event.clear()
with self._batch_lock:
if not self._batch_roi_queue:
continue
self._batch_process_rois()
def start(self):
"""启动服务"""
if self._running:
return
self._running = True
self._stop_event.clear()
self._load_cameras()
# 启动独立推理线程(生产者-消费者模式)
self._inference_thread = threading.Thread(
target=self._inference_worker,
name="InferenceWorker",
daemon=True
)
self._inference_thread.start()
self._logger.info("推理线程已启动")
self._stream_manager.start_all()
self._logger.info("Edge_Inference_Service 已启动")
self._register_signal_handlers()
self._wait_for_shutdown()
def _register_signal_handlers(self):
"""注册信号处理器"""
def handle_signal(signum, frame):
self._logger.info(f"收到信号 {signum}, 正在停止服务...")
self.stop()
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
def _wait_for_shutdown(self):
"""等待关闭信号"""
while not self._stop_event.is_set():
time.sleep(1)
def stop(self):
"""停止服务"""
if not self._running:
return
self._running = False
self._stop_event.set()
self._batch_event.set() # 唤醒推理线程以退出
if self._inference_thread and self._inference_thread.is_alive():
self._inference_thread.join(timeout=5)
if self._stream_manager:
self._stream_manager.stop_all()
self._stream_manager.close()
if self._engine_manager:
self._engine_manager.release_all()
if self._config_manager:
self._config_manager.stop_config_subscription()
self._config_manager.close()
if self._algorithm_manager:
self._algorithm_manager.stop_config_subscription()
if self._alarm_worker:
self._alarm_worker.stop()
if self._reporter:
self._reporter.close()
if self._debug_http_server:
try:
self._debug_http_server.shutdown()
except Exception:
pass
self._performance_stats["uptime_seconds"] = (
(datetime.now() - self._performance_stats["start_time"]).total_seconds()
)
self._logger.info("Edge_Inference_Service 已停止")
self._logger.info(f"运行统计: {self._performance_stats}")
def get_status(self) -> Dict[str, Any]:
"""获取服务状态"""
return {
"running": self._running,
"start_time": (
self._performance_stats["start_time"].isoformat()
if self._performance_stats["start_time"] else None
),
"uptime_seconds": self._performance_stats["uptime_seconds"],
"total_frames_processed": self._performance_stats["total_frames_processed"],
"total_alerts_generated": self._performance_stats["total_alerts_generated"],
"stream_manager": (
self._stream_manager.get_statistics()
if self._stream_manager else {}
),
"config_version": (
self._config_manager.config_version
if self._config_manager else None
),
}
def main():
"""主函数入口"""
service = EdgeInferenceService()
try:
service.initialize()
service.start()
except KeyboardInterrupt:
service.stop()
except Exception as e:
logger.error(f"服务异常: {e}")
raise
if __name__ == "__main__":
main()