120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
|
|
"""
|
|||
|
|
AIoT 边缘设备路由 - 芋道规范
|
|||
|
|
|
|||
|
|
统一到 /admin-api/aiot/edge 命名空间,与 aiot 平台架构对齐。
|
|||
|
|
|
|||
|
|
API 路径规范:
|
|||
|
|
- /admin-api/aiot/edge/device/page - 分页查询设备
|
|||
|
|
- /admin-api/aiot/edge/device/get - 获取设备详情
|
|||
|
|
- /admin-api/aiot/edge/device/statistics - 设备统计
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from fastapi import APIRouter, Query, Depends, HTTPException
|
|||
|
|
from typing import Optional
|
|||
|
|
|
|||
|
|
from app.yudao_compat import YudaoResponse, get_current_user
|
|||
|
|
from app.services.device_service import get_device_service, DeviceService
|
|||
|
|
|
|||
|
|
router = APIRouter(prefix="/admin-api/aiot/edge", tags=["AIoT-边缘设备"])
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.get("/device/page")
|
|||
|
|
async def get_device_page(
|
|||
|
|
pageNo: int = Query(1, ge=1, description="页码"),
|
|||
|
|
pageSize: int = Query(20, ge=1, le=100, description="每页大小"),
|
|||
|
|
status: Optional[str] = Query(None, description="设备状态: online/offline/error"),
|
|||
|
|
service: DeviceService = Depends(get_device_service),
|
|||
|
|
current_user: dict = Depends(get_current_user)
|
|||
|
|
):
|
|||
|
|
"""分页查询边缘设备列表"""
|
|||
|
|
devices, total = service.get_devices(
|
|||
|
|
status=status,
|
|||
|
|
page=pageNo,
|
|||
|
|
page_size=pageSize,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
device_list = []
|
|||
|
|
for device in devices:
|
|||
|
|
device_dict = device.to_dict()
|
|||
|
|
device_list.append({
|
|||
|
|
"id": device_dict.get("id"),
|
|||
|
|
"deviceId": device_dict.get("device_id"),
|
|||
|
|
"deviceName": device_dict.get("device_name"),
|
|||
|
|
"status": device_dict.get("status"),
|
|||
|
|
"statusName": _get_status_name(device_dict.get("status")),
|
|||
|
|
"lastHeartbeat": device_dict.get("last_heartbeat"),
|
|||
|
|
"uptimeSeconds": device_dict.get("uptime_seconds"),
|
|||
|
|
"framesProcessed": device_dict.get("frames_processed"),
|
|||
|
|
"alertsGenerated": device_dict.get("alerts_generated"),
|
|||
|
|
"ipAddress": device_dict.get("ip_address"),
|
|||
|
|
"streamCount": device_dict.get("stream_count"),
|
|||
|
|
"configVersion": device_dict.get("config_version"),
|
|||
|
|
"extraInfo": device_dict.get("extra_info"),
|
|||
|
|
"updatedAt": device_dict.get("updated_at"),
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
return YudaoResponse.page(
|
|||
|
|
list_data=device_list,
|
|||
|
|
total=total,
|
|||
|
|
page_no=pageNo,
|
|||
|
|
page_size=pageSize
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.get("/device/get")
|
|||
|
|
async def get_device(
|
|||
|
|
id: str = Query(..., description="设备ID"),
|
|||
|
|
service: DeviceService = Depends(get_device_service),
|
|||
|
|
current_user: dict = Depends(get_current_user)
|
|||
|
|
):
|
|||
|
|
"""获取设备详情"""
|
|||
|
|
device = service.get_device(id)
|
|||
|
|
if not device:
|
|||
|
|
raise HTTPException(status_code=404, detail="设备不存在")
|
|||
|
|
|
|||
|
|
device_dict = device.to_dict()
|
|||
|
|
return YudaoResponse.success({
|
|||
|
|
"id": device_dict.get("id"),
|
|||
|
|
"deviceId": device_dict.get("device_id"),
|
|||
|
|
"deviceName": device_dict.get("device_name"),
|
|||
|
|
"status": device_dict.get("status"),
|
|||
|
|
"statusName": _get_status_name(device_dict.get("status")),
|
|||
|
|
"lastHeartbeat": device_dict.get("last_heartbeat"),
|
|||
|
|
"uptimeSeconds": device_dict.get("uptime_seconds"),
|
|||
|
|
"framesProcessed": device_dict.get("frames_processed"),
|
|||
|
|
"alertsGenerated": device_dict.get("alerts_generated"),
|
|||
|
|
"ipAddress": device_dict.get("ip_address"),
|
|||
|
|
"streamCount": device_dict.get("stream_count"),
|
|||
|
|
"configVersion": device_dict.get("config_version"),
|
|||
|
|
"extraInfo": device_dict.get("extra_info"),
|
|||
|
|
"updatedAt": device_dict.get("updated_at"),
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.get("/device/statistics")
|
|||
|
|
async def get_device_statistics(
|
|||
|
|
service: DeviceService = Depends(get_device_service),
|
|||
|
|
current_user: dict = Depends(get_current_user)
|
|||
|
|
):
|
|||
|
|
"""获取设备统计"""
|
|||
|
|
stats = service.get_statistics()
|
|||
|
|
|
|||
|
|
return YudaoResponse.success({
|
|||
|
|
"total": stats.get("total", 0),
|
|||
|
|
"online": stats.get("online", 0),
|
|||
|
|
"offline": stats.get("offline", 0),
|
|||
|
|
"error": stats.get("error", 0),
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ==================== 辅助函数 ====================
|
|||
|
|
|
|||
|
|
def _get_status_name(status: Optional[str]) -> str:
|
|||
|
|
"""获取设备状态名称"""
|
|||
|
|
status_names = {
|
|||
|
|
"online": "在线",
|
|||
|
|
"offline": "离线",
|
|||
|
|
"error": "异常",
|
|||
|
|
}
|
|||
|
|
return status_names.get(status, status or "未知")
|