- 新增 edge/report 端点接收边缘端HTTP告警上报 - alarm_event_service 新增 create_from_edge_report 幂等创建 - schemas 新增 EdgeAlarmReport 模型 - 移除 config_service/redis_service/yudao_aiot_config 配置中转 - MQTT 服务标记废弃,告警上报改为HTTP+COS - config 新增 COS/Redis 配置项 - requirements 新增 redis 依赖 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
129 lines
4.1 KiB
Python
129 lines
4.1 KiB
Python
import os
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime
|
|
|
|
|
|
class AlertCreate(BaseModel):
|
|
camera_id: str = Field(..., description="摄像头标识")
|
|
roi_id: Optional[str] = Field(None, description="ROI区域标识")
|
|
bind_id: Optional[str] = Field(None, description="绑定ID")
|
|
device_id: Optional[str] = Field(None, description="边缘设备ID")
|
|
alert_type: str = Field(..., description="告警类型")
|
|
algorithm: Optional[str] = Field(None, description="算法名称")
|
|
confidence: Optional[int] = Field(None, ge=0, le=100, description="置信度")
|
|
duration_minutes: Optional[int] = Field(None, ge=0, description="离岗时长(分钟)")
|
|
trigger_time: datetime = Field(..., description="触发时间")
|
|
message: Optional[str] = Field(None, description="告警消息")
|
|
bbox: Optional[str] = Field(None, description="检测框坐标")
|
|
|
|
|
|
class AlertResponse(BaseModel):
|
|
id: int
|
|
alert_no: str
|
|
camera_id: str
|
|
roi_id: Optional[str]
|
|
bind_id: Optional[str]
|
|
device_id: Optional[str]
|
|
alert_type: str
|
|
algorithm: Optional[str]
|
|
confidence: Optional[int]
|
|
duration_minutes: Optional[int]
|
|
trigger_time: Optional[str]
|
|
message: Optional[str]
|
|
snapshot_url: Optional[str]
|
|
bbox: Optional[Any] = None
|
|
status: str
|
|
level: Optional[str]
|
|
handle_remark: Optional[str]
|
|
handled_by: Optional[str]
|
|
handled_at: Optional[str]
|
|
work_order_id: Optional[int]
|
|
ai_analysis: Optional[dict]
|
|
created_at: Optional[str]
|
|
updated_at: Optional[str]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class AlertListResponse(BaseModel):
|
|
alerts: List[AlertResponse]
|
|
total: int
|
|
page: int
|
|
page_size: int
|
|
|
|
|
|
class AlertHandleRequest(BaseModel):
|
|
status: str = Field(..., description="处理状态: confirmed/ignored/resolved")
|
|
remark: Optional[str] = Field(None, description="处理备注")
|
|
|
|
|
|
class AlertStatisticsResponse(BaseModel):
|
|
total: int
|
|
pending: int
|
|
confirmed: int
|
|
ignored: int
|
|
resolved: int
|
|
dispatched: int = 0
|
|
by_type: dict
|
|
by_level: Optional[dict] = None
|
|
|
|
|
|
# ==================== 设备相关 ====================
|
|
|
|
class DeviceResponse(BaseModel):
|
|
id: int
|
|
device_id: str
|
|
device_name: Optional[str]
|
|
status: str
|
|
last_heartbeat: Optional[str]
|
|
uptime_seconds: Optional[int]
|
|
frames_processed: Optional[int]
|
|
alerts_generated: Optional[int]
|
|
extra_info: Optional[Dict[str, Any]]
|
|
created_at: Optional[str]
|
|
updated_at: Optional[str]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class DeviceListResponse(BaseModel):
|
|
devices: List[DeviceResponse]
|
|
total: int
|
|
page: int
|
|
page_size: int
|
|
|
|
|
|
class DeviceStatisticsResponse(BaseModel):
|
|
total: int
|
|
online: int
|
|
offline: int
|
|
error: int
|
|
|
|
|
|
# ==================== 健康检查 ====================
|
|
|
|
class HealthResponse(BaseModel):
|
|
status: str
|
|
database: str
|
|
mqtt: Optional[str] = None
|
|
websocket_connections: Optional[int] = None
|
|
|
|
|
|
# ==================== 边缘告警上报 ====================
|
|
|
|
class EdgeAlarmReport(BaseModel):
|
|
"""边缘端告警上报请求体"""
|
|
alarm_id: str = Field(..., max_length=64, description="边缘端生成的告警ID (edge_{device_id}_{timestamp}_{uuid})")
|
|
alarm_type: str = Field(..., max_length=32, description="告警类型: leave_post/intrusion/crowd 等")
|
|
device_id: str = Field(..., max_length=64, description="摄像头/设备ID")
|
|
scene_id: Optional[str] = Field(None, max_length=64, description="场景/ROI ID")
|
|
event_time: str = Field(..., description="事件发生时间 ISO8601")
|
|
alarm_level: int = Field(2, ge=1, le=4, description="告警级别: 1提醒 2一般 3严重 4紧急")
|
|
snapshot_url: Optional[str] = Field(None, max_length=512, description="截图 COS object_key")
|
|
algorithm_code: Optional[str] = Field(None, max_length=64, description="算法编码")
|
|
confidence_score: Optional[float] = Field(None, ge=0, le=1, description="置信度 0-1")
|
|
ext_data: Optional[Dict[str, Any]] = Field(None, description="扩展数据 (bbox/target_class 等)")
|