from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from typing import Optional from datetime import datetime from pathlib import Path from app.config import settings from app.models import init_db, get_engine, AlertStatus from app.schemas import ( AlertCreate, AlertResponse, AlertListResponse, AlertHandleRequest, AlertStatisticsResponse, HealthResponse, ) from app.services.alert_service import get_alert_service from app.services.ai_analyzer import trigger_async_analysis from app.utils.logger import logger import json app = FastAPI( title="AI 告警平台", description="接收边缘端告警,提供告警查询与处理能力", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_alert_service(): return get_alert_service() @app.on_event("startup") async def startup(): init_db() logger.info("AI 告警平台启动") @app.get("/health", response_model=HealthResponse) async def health_check(): db_status = "healthy" try: engine = get_engine() with engine.connect() as conn: conn.execute("SELECT 1") except Exception as e: db_status = f"unhealthy: {e}" return HealthResponse( status="healthy" if db_status == "healthy" else "degraded", database=db_status, ) @app.post("/api/v1/alerts", response_model=AlertResponse) async def create_alert( data: str = Form(..., description="JSON数据"), snapshot: Optional[UploadFile] = File(None), service=Depends(get_alert_service), ): try: alert_data = json.loads(data) alert_create = AlertCreate(**alert_data) except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"JSON解析失败: {e}") snapshot_data = None if snapshot: snapshot_data = await snapshot.read() alert = service.create_alert(alert_create, snapshot_data) if alert.snapshot_url and settings.ai_model.endpoint: trigger_async_analysis( alert_id=alert.id, snapshot_url=alert.snapshot_url, alert_info={ "camera_id": alert.camera_id, "roi_id": alert.roi_id, "alert_type": alert.alert_type, "confidence": alert.confidence, "duration_minutes": alert.duration_minutes, }, ) return AlertResponse(**alert.to_dict()) @app.get("/api/v1/alerts", response_model=AlertListResponse) async def list_alerts( camera_id: Optional[str] = Query(None, description="摄像头ID"), alert_type: Optional[str] = Query(None, description="告警类型"), status: Optional[str] = Query(None, description="处理状态"), start_time: Optional[datetime] = Query(None, description="开始时间"), end_time: Optional[datetime] = Query(None, description="结束时间"), page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), service=Depends(get_alert_service), ): alerts, total = service.get_alerts( camera_id=camera_id, alert_type=alert_type, status=status, start_time=start_time, end_time=end_time, page=page, page_size=page_size, ) return AlertListResponse( alerts=[AlertResponse(**a.to_dict()) for a in alerts], total=total, page=page, page_size=page_size, ) @app.get("/api/v1/alerts/{alert_id}", response_model=AlertResponse) async def get_alert( alert_id: int, service=Depends(get_alert_service), ): alert = service.get_alert(alert_id) if not alert: raise HTTPException(status_code=404, detail="告警不存在") return AlertResponse(**alert.to_dict()) @app.put("/api/v1/alerts/{alert_id}/handle", response_model=AlertResponse) async def handle_alert( alert_id: int, handle_data: AlertHandleRequest, handled_by: Optional[str] = Query(None, description="处理人"), service=Depends(get_alert_service), ): alert = service.handle_alert(alert_id, handle_data, handled_by) if not alert: raise HTTPException(status_code=404, detail="告警不存在") return AlertResponse(**alert.to_dict()) @app.get("/api/v1/alerts/statistics", response_model=AlertStatisticsResponse) async def get_statistics( service=Depends(get_alert_service), ): stats = service.get_statistics() return AlertStatisticsResponse(**stats) @app.get("/api/v1/alerts/{alert_id}/image") async def get_alert_image( alert_id: int, service=Depends(get_alert_service), ): alert = service.get_alert(alert_id) if not alert: raise HTTPException(status_code=404, detail="告警不存在") if not alert.snapshot_path: raise HTTPException(status_code=404, detail="图片不存在") return FileResponse(alert.snapshot_path) if __name__ == "__main__": import uvicorn uvicorn.run( "app.main:app", host=settings.app.host, port=settings.app.port, reload=settings.app.debug, )