Files
Security_AI_integrated/api/camera.py

203 lines
5.7 KiB
Python

from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Body
from pydantic import BaseModel
from sqlalchemy.orm import Session
from db.crud import (
create_camera,
delete_camera,
get_all_cameras,
get_camera_by_id,
update_camera,
)
from db.models import get_db
from inference.pipeline import get_pipeline
router = APIRouter(prefix="/api/cameras", tags=["摄像头管理"])
router2 = APIRouter(prefix="/api/camera", tags=["摄像头状态"])
class CameraUpdateRequest(BaseModel):
name: Optional[str] = None
rtsp_url: Optional[str] = None
fps_limit: Optional[int] = None
process_every_n_frames: Optional[int] = None
enabled: Optional[bool] = None
@router.get("", response_model=List[dict])
def list_cameras(
enabled_only: bool = True,
db: Session = Depends(get_db),
):
cameras = get_all_cameras(db, enabled_only=enabled_only)
return [
{
"id": cam.id,
"name": cam.name,
"rtsp_url": cam.rtsp_url,
"enabled": cam.enabled,
"fps_limit": cam.fps_limit,
"process_every_n_frames": cam.process_every_n_frames,
"created_at": cam.created_at.isoformat() if cam.created_at else None,
}
for cam in cameras
]
@router.get("/{camera_id}", response_model=dict)
def get_camera(camera_id: int, db: Session = Depends(get_db)):
camera = get_camera_by_id(db, camera_id)
if not camera:
raise HTTPException(status_code=404, detail="摄像头不存在")
return {
"id": camera.id,
"name": camera.name,
"rtsp_url": camera.rtsp_url,
"enabled": camera.enabled,
"fps_limit": camera.fps_limit,
"process_every_n_frames": camera.process_every_n_frames,
"created_at": camera.created_at.isoformat() if camera.created_at else None,
}
@router.post("", response_model=dict)
def add_camera(
request: CameraUpdateRequest = Body(...),
db: Session = Depends(get_db),
):
camera = create_camera(
db,
name=request.name,
rtsp_url=request.rtsp_url,
fps_limit=request.fps_limit or 30,
process_every_n_frames=request.process_every_n_frames or 3,
)
if camera.enabled:
pipeline = get_pipeline()
pipeline.add_camera(camera)
return {
"id": camera.id,
"name": camera.name,
"rtsp_url": camera.rtsp_url,
"enabled": camera.enabled,
}
@router.put("/{camera_id}", response_model=dict)
def modify_camera(
camera_id: int,
request: CameraUpdateRequest = Body(...),
db: Session = Depends(get_db),
):
camera = update_camera(
db,
camera_id=camera_id,
name=request.name,
rtsp_url=request.rtsp_url,
fps_limit=request.fps_limit,
process_every_n_frames=request.process_every_n_frames,
enabled=request.enabled,
)
if not camera:
raise HTTPException(status_code=404, detail="摄像头不存在")
pipeline = get_pipeline()
if request.enabled is True:
pipeline.add_camera(camera)
elif request.enabled is False:
pipeline.remove_camera(camera_id)
return {
"id": camera.id,
"name": camera.name,
"rtsp_url": camera.rtsp_url,
"enabled": camera.enabled,
}
@router.delete("/{camera_id}")
def remove_camera(camera_id: int, db: Session = Depends(get_db)):
pipeline = get_pipeline()
pipeline.remove_camera(camera_id)
if not delete_camera(db, camera_id):
raise HTTPException(status_code=404, detail="摄像头不存在")
return {"message": "删除成功"}
@router.get("/{camera_id}/status")
def get_camera_status(camera_id: int, db: Session = Depends(get_db)):
from db.crud import get_camera_status as get_status
status = get_status(db, camera_id)
pipeline = get_pipeline()
stream = pipeline.stream_manager.get_stream(str(camera_id))
if stream:
stream_info = stream.get_info()
else:
stream_info = None
if status:
return {
"camera_id": camera_id,
"is_running": status.is_running,
"fps": status.fps,
"error_message": status.error_message,
"last_check_time": status.last_check_time.isoformat() if status.last_check_time else None,
"stream": stream_info,
}
else:
return {
"camera_id": camera_id,
"is_running": False,
"fps": 0.0,
"error_message": None,
"last_check_time": None,
"stream": stream_info,
}
router2 = APIRouter(prefix="/api/camera", tags=["摄像头状态"])
@router2.get("/status/all")
def get_all_camera_status(db: Session = Depends(get_db)):
from db.crud import get_all_cameras, get_camera_status as get_status
cameras = get_all_cameras(db, enabled_only=False)
pipeline = get_pipeline()
result = []
for cam in cameras:
status = get_status(db, cam.id)
stream = pipeline.stream_manager.get_stream(str(cam.id))
stream_info = stream.get_info() if stream else None
if status:
result.append({
"camera_id": cam.id,
"is_running": status.is_running,
"fps": status.fps,
"error_message": status.error_message,
"last_check_time": status.last_check_time.isoformat() if status.last_check_time else None,
"stream": stream_info,
})
else:
result.append({
"camera_id": cam.id,
"is_running": False,
"fps": 0.0,
"error_message": None,
"last_check_time": None,
"stream": stream_info,
})
return result