import asyncio import base64 import os import sys import threading from contextlib import asynccontextmanager from datetime import datetime from typing import Optional os.environ["TENSORRT_DISABLE_MYELIN"] = "1" import cv2 import numpy as np from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, StreamingResponse from sqlalchemy.orm import Session from prometheus_client import start_http_server from db.models import get_db sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from ultralytics.engine.results import Boxes as UltralyticsBoxes def _patch_boxes_ndim(): if not hasattr(UltralyticsBoxes, 'ndim'): @property def ndim(self): return self.data.ndim UltralyticsBoxes.ndim = ndim _patch_boxes_ndim() from api.alarm import router as alarm_router from api.camera import router as camera_router from api.roi import router as roi_router from api.sync import router as sync_router from config import get_config, load_config from db.models import init_db from inference.pipeline import get_pipeline, start_pipeline, stop_pipeline from utils.logger import setup_logger from utils.metrics import get_metrics_server, start_metrics_server, update_system_info logger = None @asynccontextmanager async def lifespan(app: FastAPI): global logger config = get_config() logger = setup_logger( name="security_monitor", log_file=config.logging.file, level=config.logging.level, max_size=config.logging.max_size, backup_count=config.logging.backup_count, ) logger.info("启动安保异常行为识别系统") init_db() logger.info("数据库初始化完成") start_metrics_server() update_system_info() pipeline = start_pipeline() logger.info(f"推理Pipeline启动,活跃摄像头数: {len(pipeline.camera_threads)}") try: yield except asyncio.CancelledError: pass finally: logger.info("正在关闭系统...") stop_pipeline() logger.info("系统已关闭") app = FastAPI( title="安保异常行为识别系统", description="基于YOLO和规则算法的智能监控系统", version="1.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(camera_router) app.include_router(roi_router) app.include_router(alarm_router) app.include_router(sync_router) @app.get("/") async def root(): return { "name": "安保异常行为识别系统", "version": "1.0.0", "status": "running", } @app.get("/health") async def health_check(): pipeline = get_pipeline() return { "status": "healthy", "running": pipeline.running, "cameras": len(pipeline.camera_threads), "timestamp": datetime.now().isoformat(), } @app.get("/api/camera/{camera_id}/snapshot") async def get_snapshot(camera_id: int): pipeline = get_pipeline() frame = pipeline.get_latest_frame(camera_id) if frame is None: raise HTTPException(status_code=404, detail="无法获取帧") _, buffer = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) return StreamingResponse( iter([buffer.tobytes()]), media_type="image/jpeg", ) @app.get("/api/camera/{camera_id}/snapshot/base64") async def get_snapshot_base64(camera_id: int): pipeline = get_pipeline() frame = pipeline.get_latest_frame(camera_id) if frame is None: raise HTTPException(status_code=404, detail="无法获取帧") _, buffer = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) return {"image": base64.b64encode(buffer).decode("utf-8")} @app.get("/api/alarms/{alarm_id}/snapshot") async def get_alarm_snapshot(alarm_id: int, db: Session = Depends(get_db)): from db.models import Alarm alarm = db.query(Alarm).filter(Alarm.id == alarm_id).first() if not alarm: raise HTTPException(status_code=404, detail="告警不存在") if not alarm.snapshot_path: raise HTTPException(status_code=404, detail="该告警没有截图") if not os.path.exists(alarm.snapshot_path): raise HTTPException(status_code=404, detail="截图文件不存在") return FileResponse(alarm.snapshot_path, media_type="image/jpeg") @app.get("/api/camera/{camera_id}/detect") async def get_detection_with_overlay(camera_id: int): pipeline = get_pipeline() frame = pipeline.get_latest_frame(camera_id) if frame is None: raise HTTPException(status_code=404, detail="无法获取帧") import json from db.crud import get_all_rois from db.models import get_session_factory SessionLocal = get_session_factory() db = SessionLocal() try: rois = get_all_rois(db, camera_id) roi_configs = [ { "id": roi.id, "roi_id": roi.roi_id, "name": roi.name, "type": roi.roi_type, "points": json.loads(roi.points), "rule": roi.rule_type, "enabled": roi.enabled, } for roi in rois ] finally: db.close() from utils.helpers import draw_detections overlay_frame = draw_detections(frame, [], roi_configs if roi_configs else None) _, buffer = cv2.imencode(".jpg", overlay_frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) return StreamingResponse( iter([buffer.tobytes()]), media_type="image/jpeg", ) @app.get("/api/pipeline/status") async def get_pipeline_status(): pipeline = get_pipeline() return pipeline.get_status() @app.get("/api/stream/list") async def list_streams(): pipeline = get_pipeline() return {"streams": pipeline.stream_manager.get_all_info()} @app.post("/api/pipeline/reload") async def reload_pipeline(): stop_pipeline() import time time.sleep(1) start_pipeline() return {"message": "Pipeline已重新加载"} def main(): import uvicorn config = load_config() uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=False, log_level=config.logging.level.lower(), ) if __name__ == "__main__": main()