- 创建 FastAPI 项目结构 - 实现告警数据模型(SQLAlchemy) - 实现 multipart/form-data 告警接收接口 - 实现阿里云 OSS 图片上传模块 - 实现告警查询和处理 API - 实现异步大模型分析模块
67 lines
1.8 KiB
Python
67 lines
1.8 KiB
Python
import os
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, List
|
|
from datetime import datetime
|
|
|
|
|
|
class AlertCreate(BaseModel):
|
|
camera_id: str = Field(..., description="摄像头标识")
|
|
roi_id: Optional[str] = Field(None, description="ROI区域标识")
|
|
alert_type: str = Field(..., description="告警类型")
|
|
algorithm: Optional[str] = Field(None, description="算法名称")
|
|
confidence: Optional[int] = Field(None, ge=0, le=100, description="置信度")
|
|
duration_minutes: Optional[int] = Field(None, ge=0, description="离岗时长(分钟)")
|
|
trigger_time: datetime = Field(..., description="触发时间")
|
|
message: Optional[str] = Field(None, description="告警消息")
|
|
|
|
|
|
class AlertResponse(BaseModel):
|
|
id: int
|
|
alert_no: str
|
|
camera_id: str
|
|
roi_id: Optional[str]
|
|
alert_type: str
|
|
algorithm: Optional[str]
|
|
confidence: Optional[int]
|
|
duration_minutes: Optional[int]
|
|
trigger_time: Optional[str]
|
|
message: Optional[str]
|
|
snapshot_url: Optional[str]
|
|
status: str
|
|
handle_remark: Optional[str]
|
|
handled_by: Optional[str]
|
|
handled_at: Optional[str]
|
|
ai_analysis: Optional[dict]
|
|
created_at: Optional[str]
|
|
updated_at: Optional[str]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class AlertListResponse(BaseModel):
|
|
alerts: List[AlertResponse]
|
|
total: int
|
|
page: int
|
|
page_size: int
|
|
|
|
|
|
class AlertHandleRequest(BaseModel):
|
|
status: str = Field(..., description="处理状态: confirmed/ignored/resolved")
|
|
remark: Optional[str] = Field(None, description="处理备注")
|
|
|
|
|
|
class AlertStatisticsResponse(BaseModel):
|
|
total: int
|
|
pending: int
|
|
confirmed: int
|
|
ignored: int
|
|
resolved: int
|
|
by_type: dict
|
|
by_level: dict
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
status: str
|
|
database: str
|