Files
Security_AI_integrated/main.py
2026-01-20 17:42:18 +08:00

212 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import base64
import os
import sys
import threading
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional
import cv2
import numpy as np
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from prometheus_client import start_http_server
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from api.alarm import router as alarm_router
from api.camera import router as camera_router
from api.roi import router as roi_router
from config import get_config, load_config
from db.models import init_db
from inference.pipeline import get_pipeline, start_pipeline, stop_pipeline
from utils.logger import setup_logger
from utils.metrics import get_metrics_server, start_metrics_server, update_system_info
logger = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global logger
config = get_config()
logger = setup_logger(
name="security_monitor",
log_file=config.logging.file,
level=config.logging.level,
max_size=config.logging.max_size,
backup_count=config.logging.backup_count,
)
logger.info("启动安保异常行为识别系统")
init_db()
logger.info("数据库初始化完成")
start_metrics_server()
update_system_info()
pipeline = start_pipeline()
logger.info(f"推理Pipeline启动活跃摄像头数: {len(pipeline.camera_threads)}")
try:
yield
except asyncio.CancelledError:
pass
finally:
logger.info("正在关闭系统...")
stop_pipeline()
logger.info("系统已关闭")
app = FastAPI(
title="安保异常行为识别系统",
description="基于YOLO和规则算法的智能监控系统",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(camera_router)
app.include_router(roi_router)
app.include_router(alarm_router)
@app.get("/")
async def root():
return {
"name": "安保异常行为识别系统",
"version": "1.0.0",
"status": "running",
}
@app.get("/health")
async def health_check():
pipeline = get_pipeline()
return {
"status": "healthy",
"running": pipeline.running,
"cameras": len(pipeline.camera_threads),
"timestamp": datetime.now().isoformat(),
}
@app.get("/api/camera/{camera_id}/snapshot")
async def get_snapshot(camera_id: int):
pipeline = get_pipeline()
frame = pipeline.get_latest_frame(camera_id)
if frame is None:
raise HTTPException(status_code=404, detail="无法获取帧")
_, buffer = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
return StreamingResponse(
iter([buffer.tobytes()]),
media_type="image/jpeg",
)
@app.get("/api/camera/{camera_id}/snapshot/base64")
async def get_snapshot_base64(camera_id: int):
pipeline = get_pipeline()
frame = pipeline.get_latest_frame(camera_id)
if frame is None:
raise HTTPException(status_code=404, detail="无法获取帧")
_, buffer = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
return {"image": base64.b64encode(buffer).decode("utf-8")}
@app.get("/api/camera/{camera_id}/detect")
async def get_detection_with_overlay(camera_id: int):
pipeline = get_pipeline()
frame = pipeline.get_latest_frame(camera_id)
if frame is None:
raise HTTPException(status_code=404, detail="无法获取帧")
import json
from db.crud import get_all_rois
from db.models import get_session_factory
SessionLocal = get_session_factory()
db = SessionLocal()
try:
rois = get_all_rois(db, camera_id)
roi_configs = [
{
"id": roi.id,
"roi_id": roi.roi_id,
"name": roi.name,
"type": roi.roi_type,
"points": json.loads(roi.points),
"rule": roi.rule_type,
"enabled": roi.enabled,
}
for roi in rois
]
finally:
db.close()
from utils.helpers import draw_detections
overlay_frame = draw_detections(frame, [], roi_configs if roi_configs else None)
_, buffer = cv2.imencode(".jpg", overlay_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
return StreamingResponse(
iter([buffer.tobytes()]),
media_type="image/jpeg",
)
@app.get("/api/pipeline/status")
async def get_pipeline_status():
pipeline = get_pipeline()
return pipeline.get_status()
@app.get("/api/stream/list")
async def list_streams():
pipeline = get_pipeline()
return {"streams": pipeline.stream_manager.get_all_info()}
@app.post("/api/pipeline/reload")
async def reload_pipeline():
stop_pipeline()
import time
time.sleep(1)
start_pipeline()
return {"message": "Pipeline已重新加载"}
def main():
import uvicorn
config = load_config()
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=False,
log_level=config.logging.level.lower(),
)
if __name__ == "__main__":
main()