"""FastAPI Web 仪表盘"""
import os
from datetime import date, datetime, time, timedelta
from pathlib import Path
from typing import Optional
from fastapi import Cookie, Depends, FastAPI, File, Form, Header, HTTPException, Query, Request, Response, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, field_validator
from uuid import uuid4
from ..core import database as db
from ..core.models import Exercise, Meal, Sleep, UserConfig, Weight, User, Reading, Invite
from ..core.auth import hash_password, verify_password, create_token, decode_token, generate_invite_code, get_token_expire_seconds
# 初始化数据库
db.init_db()
db.migrate_auth_fields() # 迁移认证字段
db.migrate_config_add_user_id() # 迁移 config 表添加 user_id
app = FastAPI(
title="Vitals 健康管理",
description="本地优先的综合健康管理应用",
version="0.1.0",
)
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 认证中间件
class AuthMiddleware(BaseHTTPMiddleware):
"""服务端认证中间件 - 未登录用户重定向到登录页"""
# 不需要认证的路径前缀
PUBLIC_PREFIXES = (
"/api/auth/login",
"/api/auth/register",
"/login",
"/register",
"/static",
"/photos",
"/docs",
"/openapi.json",
"/favicon.ico",
)
async def dispatch(self, request: Request, call_next):
path = request.url.path
# 检查是否为公开路径
if any(path.startswith(prefix) for prefix in self.PUBLIC_PREFIXES):
return await call_next(request)
# 从 Cookie 获取 token
token = request.cookies.get("auth_token")
# API 请求也支持 Header 认证(向后兼容)
if not token and path.startswith("/api/"):
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header[7:]
# 验证 token
if token:
payload = decode_token(token)
if payload:
# token 有效,继续处理
return await call_next(request)
# 未认证
if path.startswith("/api/"):
# API 请求返回 401
return JSONResponse(
status_code=401,
content={"detail": "未登录或登录已过期"}
)
else:
# 页面请求重定向到登录页
redirect_url = f"/login?redirect={path}" if path != "/" else "/login"
return RedirectResponse(url=redirect_url, status_code=302)
app.add_middleware(AuthMiddleware)
# 静态文件
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=static_dir), name="static")
# 食物照片静态文件
photos_dir = Path.home() / ".vitals" / "photos"
if photos_dir.exists():
app.mount("/photos", StaticFiles(directory=photos_dir), name="photos")
# ===== Pydantic 模型 =====
class ExerciseResponse(BaseModel):
id: Optional[int]
date: str
type: str
duration: int
calories: int
distance: Optional[float]
heart_rate_avg: Optional[int]
source: str
notes: Optional[str]
class MealResponse(BaseModel):
id: Optional[int]
date: str
meal_type: str
description: str
calories: int
protein: Optional[float]
carbs: Optional[float]
fat: Optional[float]
photo_path: Optional[str]
class SleepResponse(BaseModel):
id: Optional[int]
date: str
bedtime: Optional[str]
wake_time: Optional[str]
duration: float
quality: int
deep_sleep_mins: Optional[int]
source: str
notes: Optional[str]
class WeightResponse(BaseModel):
id: Optional[int]
date: str
weight_kg: float
body_fat_pct: Optional[float]
muscle_mass: Optional[float]
notes: Optional[str]
class ConfigResponse(BaseModel):
age: Optional[int]
gender: Optional[str]
height: Optional[float]
weight: Optional[float]
activity_level: str
goal: str
bmr: Optional[int]
tdee: Optional[int]
class TodaySummary(BaseModel):
date: str
calories_intake: int
calories_burned: int
calories_balance: int
exercise_count: int
exercise_duration: int
sleep_duration: Optional[float]
sleep_quality: Optional[int]
weight: Optional[float]
tdee: Optional[int]
meals: list[MealResponse]
exercises: list[ExerciseResponse]
class WeekSummary(BaseModel):
start_date: str
end_date: str
total_exercise_count: int
total_exercise_duration: int
total_calories_burned: int
avg_calories_intake: int
avg_sleep_duration: float
avg_sleep_quality: float
weight_start: Optional[float]
weight_end: Optional[float]
weight_change: Optional[float]
daily_stats: list[dict]
# ===== Web 数据录入模型 =====
class ExerciseInput(BaseModel):
date: str
type: str
duration: int
calories: Optional[int] = None
distance: Optional[float] = None
heart_rate_avg: Optional[int] = None
notes: Optional[str] = None
@field_validator("duration")
@classmethod
def validate_duration(cls, value: int) -> int:
if value <= 0 or value > 1440:
raise ValueError("时长必须在 1-1440 分钟之间")
return value
class SleepInput(BaseModel):
date: str
bedtime: Optional[str] = None
wake_time: Optional[str] = None
duration: float
quality: int
notes: Optional[str] = None
@field_validator("quality")
@classmethod
def validate_quality(cls, value: int) -> int:
if value < 1 or value > 5:
raise ValueError("质量评分必须在 1-5 之间")
return value
@field_validator("duration")
@classmethod
def validate_sleep_duration(cls, value: float) -> float:
if value <= 0 or value > 24:
raise ValueError("睡眠时长必须在 0-24 小时之间")
return value
class WeightInput(BaseModel):
date: str
weight_kg: float
body_fat_pct: Optional[float] = None
muscle_mass: Optional[float] = None
notes: Optional[str] = None
@field_validator("weight_kg")
@classmethod
def validate_weight(cls, value: float) -> float:
if value < 20 or value > 300:
raise ValueError("体重必须在 20-300 kg 之间")
return value
class UserResponse(BaseModel):
id: int
name: str
created_at: str
is_active: bool
gender: Optional[str] = None
height_cm: Optional[float] = None
weight_kg: Optional[float] = None
age: Optional[int] = None
bmi: Optional[float] = None
bmi_status: Optional[str] = None
class UserInput(BaseModel):
name: str
gender: Optional[str] = None
height_cm: Optional[float] = None
weight_kg: Optional[float] = None
age: Optional[int] = None
@field_validator("name")
@classmethod
def validate_name(cls, value: str) -> str:
if not value or len(value.strip()) == 0:
raise ValueError("用户名不能为空")
if len(value) > 50:
raise ValueError("用户名不能超过 50 个字符")
return value.strip()
@field_validator("gender")
@classmethod
def validate_gender(cls, value: Optional[str]) -> Optional[str]:
if value is not None and value not in ["male", "female"]:
raise ValueError("性别必须是 'male' 或 'female'")
return value
@field_validator("height_cm")
@classmethod
def validate_height(cls, value: Optional[float]) -> Optional[float]:
if value is not None and (value < 50 or value > 300):
raise ValueError("身高必须在 50-300 cm 之间")
return value
@field_validator("weight_kg")
@classmethod
def validate_weight(cls, value: Optional[float]) -> Optional[float]:
if value is not None and (value < 20 or value > 500):
raise ValueError("体重必须在 20-500 kg 之间")
return value
@field_validator("age")
@classmethod
def validate_age(cls, value: Optional[int]) -> Optional[int]:
if value is not None and (value < 1 or value > 150):
raise ValueError("年龄必须在 1-150 之间")
return value
class DataClearInput(BaseModel):
user_id: int
mode: str # "all" | "range" | "type"
date_from: Optional[str] = None
date_to: Optional[str] = None
data_types: Optional[list[str]] = None
@field_validator("mode")
@classmethod
def validate_mode(cls, value: str) -> str:
if value not in ["all", "range", "type"]:
raise ValueError("mode 必须是 'all', 'range' 或 'type'")
return value
class ReadingInput(BaseModel):
title: str
author: Optional[str] = None
cover_url: Optional[str] = None
duration: int
mood: Optional[str] = None
notes: Optional[str] = None
date: Optional[str] = None
@field_validator("title")
@classmethod
def validate_title(cls, value: str) -> str:
if not value or len(value.strip()) == 0:
raise ValueError("书名不能为空")
return value.strip()
@field_validator("duration")
@classmethod
def validate_duration(cls, value: int) -> int:
if value < 1 or value > 1440:
raise ValueError("阅读时长必须在 1-1440 分钟之间")
return value
@field_validator("mood")
@classmethod
def validate_mood(cls, value: Optional[str]) -> Optional[str]:
valid_moods = ["😄", "😊", "😐", "😔", "😢"]
if value and value not in valid_moods:
raise ValueError("心情必须是 😄😊😐😔😢 之一")
return value
class ReadingResponse(BaseModel):
id: int
user_id: int
date: str
title: str
author: Optional[str]
cover_url: Optional[str]
duration: int
mood: Optional[str]
notes: Optional[str]
# ===== 认证相关模型 =====
class LoginInput(BaseModel):
username: str
password: str
remember_me: bool = False
class RegisterInput(BaseModel):
username: str
password: str
invite_code: str
@field_validator("username")
@classmethod
def validate_username(cls, value: str) -> str:
if not value or len(value.strip()) == 0:
raise ValueError("用户名不能为空")
if len(value) > 50:
raise ValueError("用户名不能超过 50 个字符")
return value.strip()
@field_validator("password")
@classmethod
def validate_password(cls, value: str) -> str:
if len(value) < 6:
raise ValueError("密码长度至少 6 位")
return value
class TokenResponse(BaseModel):
token: str
user: "AuthUserResponse"
class AuthUserResponse(BaseModel):
id: int
name: str
is_admin: bool
is_disabled: bool
class InviteResponse(BaseModel):
id: int
code: str
created_by: int
used_by: Optional[int]
created_at: str
expires_at: Optional[str]
is_used: bool
is_expired: bool
class InviteInput(BaseModel):
expires_days: Optional[int] = None # 可选的过期天数
# ===== 认证辅助函数 =====
# 不需要认证的路径
PUBLIC_PATHS = {
"/api/auth/login",
"/api/auth/register",
"/login",
"/register",
"/static",
"/photos",
"/docs",
"/openapi.json",
}
def get_token_from_header(authorization: Optional[str] = Header(None)) -> Optional[str]:
"""从 Header 提取 Token"""
if not authorization:
return None
if authorization.startswith("Bearer "):
return authorization[7:]
return None
def get_token_from_request(
request: Request,
authorization: Optional[str] = Header(None),
auth_token: Optional[str] = Cookie(None)
) -> Optional[str]:
"""从 Cookie 或 Header 中获取 Token(优先 Cookie)"""
# 优先从 Cookie 获取
if auth_token:
return auth_token
# 其次从 Header 获取(向后兼容)
return get_token_from_header(authorization)
def get_current_user(
request: Request,
authorization: Optional[str] = Header(None),
auth_token: Optional[str] = Cookie(None)
) -> Optional[User]:
"""获取当前登录用户(可选,返回 None 表示未登录)"""
token = get_token_from_request(request, authorization, auth_token)
if not token:
return None
payload = decode_token(token)
if not payload:
return None
user = db.get_user(payload["user_id"])
if not user or user.is_disabled:
return None
return user
def require_user(
request: Request,
authorization: Optional[str] = Header(None),
auth_token: Optional[str] = Cookie(None)
) -> User:
"""要求用户登录(必须认证)"""
user = get_current_user(request, authorization, auth_token)
if not user:
raise HTTPException(status_code=401, detail="未登录或登录已过期")
return user
def require_admin(
request: Request,
authorization: Optional[str] = Header(None),
auth_token: Optional[str] = Cookie(None)
) -> User:
"""要求管理员权限"""
user = require_user(request, authorization, auth_token)
if not user.is_admin:
raise HTTPException(status_code=403, detail="需要管理员权限")
return user
def get_current_user_id(
request: Request,
authorization: Optional[str] = Header(None),
auth_token: Optional[str] = Cookie(None)
) -> int:
"""获取当前用户 ID(兼容模式:未登录时使用活跃用户)"""
user = get_current_user(request, authorization, auth_token)
if user:
return user.id
# 兼容模式:未登录时使用当前活跃用户
db.ensure_default_user()
active_user = db.get_active_user()
return active_user.id if active_user else 1
# ===== API 路由 =====
# ===== 认证 API =====
@app.post("/api/auth/login")
async def login(data: LoginInput):
"""用户登录"""
user = db.get_user_by_name(data.username)
if not user:
raise HTTPException(status_code=401, detail="用户名或密码错误")
if not user.password_hash:
raise HTTPException(status_code=401, detail="用户未设置密码,请联系管理员")
if not verify_password(data.password, user.password_hash):
raise HTTPException(status_code=401, detail="用户名或密码错误")
if user.is_disabled:
raise HTTPException(status_code=403, detail="账户已被禁用")
# 创建 token(根据 remember_me 设置不同的过期时间)
token = create_token(user.id, user.name, user.is_admin, data.remember_me)
# 创建响应并设置 Cookie
response = JSONResponse(content={
"token": token,
"user": {
"id": user.id,
"name": user.name,
"is_admin": user.is_admin,
"is_disabled": user.is_disabled,
}
})
# 设置 HTTPOnly Cookie
max_age = get_token_expire_seconds(data.remember_me) if data.remember_me else None
response.set_cookie(
key="auth_token",
value=token,
httponly=True,
secure=False, # HTTP 环境,生产环境应设为 True
samesite="lax",
max_age=max_age, # None 表示会话 Cookie
path="/", # 确保所有路径都能使用此 Cookie
)
return response
@app.post("/api/auth/register")
async def register(data: RegisterInput):
"""用户注册(需要邀请码)"""
# 验证邀请码
invite = db.get_invite_by_code(data.invite_code)
if not invite:
raise HTTPException(status_code=400, detail="邀请码无效")
if invite.is_used:
raise HTTPException(status_code=400, detail="邀请码已被使用")
if invite.is_expired:
raise HTTPException(status_code=400, detail="邀请码已过期")
# 检查用户名是否已存在
existing = db.get_user_by_name(data.username)
if existing:
raise HTTPException(status_code=400, detail="用户名已存在")
# 创建用户
password_hash = hash_password(data.password)
new_user = User(
name=data.username,
password_hash=password_hash,
is_active=False,
is_admin=False,
is_disabled=False,
)
user_id = db.add_user(new_user)
# 标记邀请码已使用
db.mark_invite_used(invite.id, user_id)
# 获取创建的用户
user = db.get_user(user_id)
# 生成 token 并创建响应
token = create_token(user.id, user.name, user.is_admin)
response = JSONResponse(content={
"token": token,
"user": {
"id": user.id,
"name": user.name,
"is_admin": user.is_admin,
"is_disabled": user.is_disabled,
}
})
# 设置 Cookie
response.set_cookie(
key="auth_token",
value=token,
httponly=True,
secure=False,
samesite="lax",
max_age=get_token_expire_seconds(False), # 注册默认 1 天
path="/",
)
return response
@app.post("/api/auth/logout")
async def logout(response: Response):
"""用户登出"""
response.delete_cookie(key="auth_token")
return {"message": "登出成功"}
@app.get("/api/auth/me", response_model=AuthUserResponse)
async def get_me(user: User = Depends(require_user)):
"""获取当前登录用户信息"""
return AuthUserResponse(
id=user.id,
name=user.name,
is_admin=user.is_admin,
is_disabled=user.is_disabled,
)
# ===== 管理员 API =====
@app.get("/api/admin/users")
async def admin_get_users(admin: User = Depends(require_admin)):
"""获取所有用户(管理员)"""
users = db.get_users()
return [
{
"id": u.id,
"name": u.name,
"email": u.email,
"is_admin": u.is_admin,
"is_disabled": u.is_disabled,
"created_at": u.created_at.isoformat(),
}
for u in users
]
@app.post("/api/admin/users/{user_id}/disable")
async def admin_disable_user(user_id: int, admin: User = Depends(require_admin)):
"""禁用用户(管理员)"""
if user_id == admin.id:
raise HTTPException(status_code=400, detail="不能禁用自己")
user = db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
user.is_disabled = True
db.update_user(user)
return {"message": "用户已禁用"}
@app.post("/api/admin/users/{user_id}/enable")
async def admin_enable_user(user_id: int, admin: User = Depends(require_admin)):
"""启用用户(管理员)"""
user = db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
user.is_disabled = False
db.update_user(user)
return {"message": "用户已启用"}
@app.delete("/api/admin/users/{user_id}")
async def admin_delete_user(user_id: int, admin: User = Depends(require_admin)):
"""删除用户(管理员)"""
if user_id == admin.id:
raise HTTPException(status_code=400, detail="不能删除自己")
user = db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 删除用户数据
db.clear_data(user_id, mode="all")
# 删除用户
db.delete_user(user_id)
return {"message": "用户已删除"}
@app.get("/api/admin/invites", response_model=list[InviteResponse])
async def admin_get_invites(admin: User = Depends(require_admin)):
"""获取邀请码列表(管理员)"""
invites = db.get_invites()
return [
InviteResponse(
id=inv.id,
code=inv.code,
created_by=inv.created_by,
used_by=inv.used_by,
created_at=inv.created_at.isoformat(),
expires_at=inv.expires_at.isoformat() if inv.expires_at else None,
is_used=inv.is_used,
is_expired=inv.is_expired,
)
for inv in invites
]
@app.post("/api/admin/invites", response_model=InviteResponse)
async def admin_create_invite(data: InviteInput, admin: User = Depends(require_admin)):
"""生成邀请码(管理员)"""
code = generate_invite_code()
expires_at = None
if data.expires_days:
expires_at = datetime.now() + timedelta(days=data.expires_days)
invite = Invite(
code=code,
created_by=admin.id,
expires_at=expires_at,
)
invite_id = db.add_invite(invite)
created_invite = db.get_invite_by_code(code)
return InviteResponse(
id=created_invite.id,
code=created_invite.code,
created_by=created_invite.created_by,
used_by=created_invite.used_by,
created_at=created_invite.created_at.isoformat(),
expires_at=created_invite.expires_at.isoformat() if created_invite.expires_at else None,
is_used=created_invite.is_used,
is_expired=created_invite.is_expired,
)
@app.delete("/api/admin/invites/{invite_id}")
async def admin_delete_invite(invite_id: int, admin: User = Depends(require_admin)):
"""删除邀请码(管理员)"""
db.delete_invite(invite_id)
return {"message": "邀请码已删除"}
# ===== API 密钥管理(管理员) =====
@app.get("/api/admin/api-keys")
async def admin_get_api_keys(admin: User = Depends(require_admin)):
"""获取所有 API Keys 状态(掩码显示)"""
return db.get_all_api_keys(masked=True)
@app.get("/api/admin/api-keys/{provider}")
async def admin_get_api_key(provider: str, admin: User = Depends(require_admin)):
"""获取指定 API Key 完整值"""
if provider not in db.API_KEY_ENV_MAP:
raise HTTPException(status_code=400, detail=f"未知的 provider: {provider}")
value = db.get_api_key(provider)
return {
"provider": provider,
"name": db.API_KEY_NAMES.get(provider, provider),
"value": value,
}
class ApiKeyInput(BaseModel):
value: str
@app.put("/api/admin/api-keys/{provider}")
async def admin_set_api_key(provider: str, data: ApiKeyInput, admin: User = Depends(require_admin)):
"""设置/更新 API Key"""
if provider not in db.API_KEY_ENV_MAP:
raise HTTPException(status_code=400, detail=f"未知的 provider: {provider}")
db.set_api_key(provider, data.value)
return {"message": "API Key 已保存", "provider": provider}
@app.delete("/api/admin/api-keys/{provider}")
async def admin_delete_api_key(provider: str, admin: User = Depends(require_admin)):
"""删除 API Key(回退到环境变量)"""
if provider not in db.API_KEY_ENV_MAP:
raise HTTPException(status_code=400, detail=f"未知的 provider: {provider}")
db.delete_api_key(provider)
return {"message": "API Key 已删除,将使用环境变量配置", "provider": provider}
# ===== 页面路由 =====
@app.get("/login")
async def login_page():
"""登录页面"""
return HTMLResponse(content=get_login_page_html(), status_code=200)
@app.get("/register")
async def register_page():
"""注册页面"""
return HTMLResponse(content=get_register_page_html(), status_code=200)
@app.get("/admin")
async def admin_page():
"""管理后台页面"""
return HTMLResponse(content=get_admin_page_html(), status_code=200)
@app.get("/")
async def root():
"""首页"""
return HTMLResponse(content=get_dashboard_html(), status_code=200)
@app.get("/exercise")
async def exercise_page():
"""运动页面"""
return HTMLResponse(content=get_exercise_page_html(), status_code=200)
@app.get("/meal")
async def meal_page():
"""饮食页面"""
return HTMLResponse(content=get_meal_page_html(), status_code=200)
@app.get("/report")
async def report_page():
"""报告页面"""
return HTMLResponse(content=get_report_page_html(), status_code=200)
@app.get("/api/report/week")
async def download_week_report(format: str = Query(default="pdf")):
"""生成并下载周报"""
from ..core.report import export_report, generate_weekly_report
report = generate_weekly_report()
return _export_report_file(report, format, "weekly", export_report)
@app.get("/api/report/month")
async def download_month_report(
year: Optional[int] = Query(default=None),
month: Optional[int] = Query(default=None),
format: str = Query(default="pdf"),
):
"""生成并下载月报"""
from ..core.report import export_report, generate_monthly_report
report = generate_monthly_report(year, month)
return _export_report_file(report, format, "monthly", export_report)
@app.get("/sleep")
async def sleep_page():
"""睡眠页面"""
return HTMLResponse(content=get_sleep_page_html(), status_code=200)
@app.get("/weight")
async def weight_page():
"""体重页面"""
return HTMLResponse(content=get_weight_page_html(), status_code=200)
@app.get("/settings")
async def settings_page():
"""设置页面"""
return HTMLResponse(content=get_settings_page_html(), status_code=200)
@app.get("/api/config", response_model=ConfigResponse)
async def get_config():
"""获取用户配置"""
active_user = db.get_active_user()
user_id = active_user.id if active_user else 1
config = db.get_config(user_id)
return ConfigResponse(
age=config.age,
gender=config.gender,
height=config.height,
weight=config.weight,
activity_level=config.activity_level,
goal=config.goal,
bmr=config.bmr,
tdee=config.tdee,
)
@app.get("/api/today", response_model=TodaySummary)
async def get_today_summary():
"""获取今日概览"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
today = date.today()
config = db.get_config(active_user.id)
# 获取今日数据
exercises = db.get_exercises(start_date=today, end_date=today, user_id=active_user.id)
meals = db.get_meals(start_date=today, end_date=today, user_id=active_user.id)
sleep_records = db.get_sleep_records(start_date=today, end_date=today, user_id=active_user.id)
weight_records = db.get_weight_records(start_date=today, end_date=today, user_id=active_user.id)
# 计算卡路里
calories_intake = sum(m.calories for m in meals)
calories_burned = sum(e.calories for e in exercises)
# 基础代谢 + 运动消耗
tdee = config.tdee or 0
total_burned = tdee + calories_burned
calories_balance = calories_intake - total_burned
# 睡眠数据
sleep = sleep_records[0] if sleep_records else None
# 最新体重
latest_weight = db.get_latest_weight()
return TodaySummary(
date=today.isoformat(),
calories_intake=calories_intake,
calories_burned=calories_burned,
calories_balance=calories_balance,
exercise_count=len(exercises),
exercise_duration=sum(e.duration for e in exercises),
sleep_duration=sleep.duration if sleep else None,
sleep_quality=sleep.quality if sleep else None,
weight=latest_weight.weight_kg if latest_weight else None,
tdee=config.tdee,
meals=[
MealResponse(
id=m.id,
date=m.date.isoformat(),
meal_type=m.meal_type,
description=m.description,
calories=m.calories,
protein=m.protein,
carbs=m.carbs,
fat=m.fat,
photo_path=m.photo_path,
)
for m in meals
],
exercises=[
ExerciseResponse(
id=e.id,
date=e.date.isoformat(),
type=e.type,
duration=e.duration,
calories=e.calories,
distance=e.distance,
heart_rate_avg=e.heart_rate_avg,
source=e.source,
notes=e.notes,
)
for e in exercises
],
)
@app.get("/api/week", response_model=WeekSummary)
async def get_week_summary():
"""获取本周汇总"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
today = date.today()
start_of_week = today - timedelta(days=today.weekday())
end_of_week = start_of_week + timedelta(days=6)
# 获取本周数据
exercises = db.get_exercises(start_date=start_of_week, end_date=end_of_week, user_id=active_user.id)
meals = db.get_meals(start_date=start_of_week, end_date=end_of_week, user_id=active_user.id)
sleep_records = db.get_sleep_records(start_date=start_of_week, end_date=end_of_week, user_id=active_user.id)
weight_records = db.get_weight_records(start_date=start_of_week, end_date=end_of_week, user_id=active_user.id)
# 计算统计数据
total_exercise_duration = sum(e.duration for e in exercises)
total_calories_burned = sum(e.calories for e in exercises)
total_calories_intake = sum(m.calories for m in meals)
# 计算平均值
days_with_meals = len(set(m.date for m in meals)) or 1
avg_calories_intake = total_calories_intake // days_with_meals
avg_sleep_duration = (
sum(s.duration for s in sleep_records) / len(sleep_records)
if sleep_records
else 0
)
avg_sleep_quality = (
sum(s.quality for s in sleep_records) / len(sleep_records)
if sleep_records
else 0
)
# 体重变化
weight_records_sorted = sorted(weight_records, key=lambda w: w.date)
weight_start = weight_records_sorted[0].weight_kg if weight_records_sorted else None
weight_end = weight_records_sorted[-1].weight_kg if weight_records_sorted else None
weight_change = (
round(weight_end - weight_start, 2)
if weight_start and weight_end
else None
)
# 每日统计
daily_stats = []
for i in range(7):
day = start_of_week + timedelta(days=i)
day_exercises = [e for e in exercises if e.date == day]
day_meals = [m for m in meals if m.date == day]
day_sleep = next((s for s in sleep_records if s.date == day), None)
daily_stats.append({
"date": day.isoformat(),
"weekday": ["周一", "周二", "周三", "周四", "周五", "周六", "周日"][i],
"exercise_duration": sum(e.duration for e in day_exercises),
"calories_intake": sum(m.calories for m in day_meals),
"calories_burned": sum(e.calories for e in day_exercises),
"sleep_duration": day_sleep.duration if day_sleep else None,
})
return WeekSummary(
start_date=start_of_week.isoformat(),
end_date=end_of_week.isoformat(),
total_exercise_count=len(exercises),
total_exercise_duration=total_exercise_duration,
total_calories_burned=total_calories_burned,
avg_calories_intake=avg_calories_intake,
avg_sleep_duration=round(avg_sleep_duration, 1),
avg_sleep_quality=round(avg_sleep_quality, 1),
weight_start=weight_start,
weight_end=weight_end,
weight_change=weight_change,
daily_stats=daily_stats,
)
@app.get("/api/exercises", response_model=list[ExerciseResponse])
async def get_exercises(
days: int = Query(default=30, ge=1, le=365, description="查询天数"),
):
"""获取运动记录"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
end_date = date.today()
start_date = end_date - timedelta(days=days)
exercises = db.get_exercises(start_date=start_date, end_date=end_date, user_id=active_user.id)
return [
ExerciseResponse(
id=e.id,
date=e.date.isoformat(),
type=e.type,
duration=e.duration,
calories=e.calories,
distance=e.distance,
heart_rate_avg=e.heart_rate_avg,
source=e.source,
notes=e.notes,
)
for e in exercises
]
@app.get("/api/exercises/stats")
async def get_exercise_stats(
days: int = Query(default=30, ge=1, le=365, description="统计天数"),
):
"""获取运动统计数据"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
today = date.today()
period_start = today - timedelta(days=days - 1)
month_start = date(today.year, today.month, 1)
month_exercises = db.get_exercises(start_date=month_start, end_date=today, user_id=active_user.id)
month_count = len(month_exercises)
month_duration = sum(e.duration for e in month_exercises)
month_calories = sum(e.calories for e in month_exercises)
month_type_counts: dict[str, int] = {}
for e in month_exercises:
month_type_counts[e.type] = month_type_counts.get(e.type, 0) + 1
top_type = max(month_type_counts, key=month_type_counts.get) if month_type_counts else None
period_exercises = db.get_exercises(start_date=period_start, end_date=today)
type_counts: dict[str, int] = {}
type_calories: dict[str, int] = {}
for e in period_exercises:
type_counts[e.type] = type_counts.get(e.type, 0) + 1
type_calories[e.type] = type_calories.get(e.type, 0) + e.calories
daily_stats = []
for i in range(days):
day = period_start + timedelta(days=i)
day_exercises = [e for e in period_exercises if e.date == day]
daily_stats.append({
"date": day.isoformat(),
"duration": sum(e.duration for e in day_exercises),
"calories": sum(e.calories for e in day_exercises),
})
return {
"month": {
"count": month_count,
"duration": month_duration,
"calories": month_calories,
"top_type": top_type,
},
"period": {
"days": days,
"type_counts": type_counts,
"type_calories": type_calories,
},
"daily_stats": daily_stats,
}
@app.get("/api/meals", response_model=list[MealResponse])
async def get_meals(
days: int = Query(default=30, ge=1, le=365, description="查询天数"),
):
"""获取饮食记录"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
end_date = date.today()
start_date = end_date - timedelta(days=days)
meals = db.get_meals(start_date=start_date, end_date=end_date, user_id=active_user.id)
return [
MealResponse(
id=m.id,
date=m.date.isoformat(),
meal_type=m.meal_type,
description=m.description,
calories=m.calories,
protein=m.protein,
carbs=m.carbs,
fat=m.fat,
photo_path=m.photo_path,
)
for m in meals
]
@app.post("/api/exercise")
async def add_exercise_api(data: ExerciseInput):
"""添加运动记录"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
try:
record_date = date.fromisoformat(data.date)
except ValueError as exc:
raise HTTPException(status_code=400, detail="日期格式应为 YYYY-MM-DD") from exc
from ..core.calories import estimate_exercise_calories
config = db.get_config(active_user.id)
weight_kg = config.weight or 70
calories = data.calories if data.calories is not None else estimate_exercise_calories(
data.type, data.duration, weight_kg
)
exercise = Exercise(
date=record_date,
type=data.type,
duration=data.duration,
calories=calories,
distance=data.distance,
heart_rate_avg=data.heart_rate_avg,
notes=data.notes,
source="web",
)
record_id = db.add_exercise(exercise, user_id=active_user.id)
return {"success": True, "id": record_id}
@app.delete("/api/exercise/{exercise_id}")
async def delete_exercise_api(exercise_id: int):
"""删除运动记录"""
try:
db.delete_exercise(exercise_id)
return {"success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/meal")
async def add_meal_api(
date_str: str = Form(...),
meal_type: str = Form(...),
description: str = Form(""),
calories: Optional[str] = Form(None),
protein: Optional[str] = Form(None),
carbs: Optional[str] = Form(None),
fat: Optional[str] = Form(None),
photo: Optional[UploadFile] = File(None),
):
"""添加饮食记录(支持照片上传)"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
def _parse_optional_int(value: Optional[str], field_name: str) -> Optional[int]:
if value is None:
return None
value = value.strip()
if value == "":
return None
try:
return int(value)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"{field_name} 必须是整数") from exc
def _parse_optional_float(value: Optional[str], field_name: str) -> Optional[float]:
if value is None:
return None
value = value.strip()
if value == "":
return None
try:
return float(value)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"{field_name} 必须是数字") from exc
try:
record_date = date.fromisoformat(date_str)
except ValueError as exc:
raise HTTPException(status_code=400, detail="日期格式应为 YYYY-MM-DD") from exc
photo_path = None
if photo:
photos_root = Path.home() / ".vitals" / "photos" / date_str[:7]
photos_root.mkdir(parents=True, exist_ok=True)
suffix = Path(photo.filename or "").suffix.lower() or ".jpg"
safe_type = meal_type.replace(" ", "_")
timestamp = datetime.now().strftime("%H%M%S")
file_path = photos_root / f"{date_str}_{safe_type}_{timestamp}{suffix}"
content = await photo.read()
if len(content) > 5 * 1024 * 1024:
raise HTTPException(status_code=400, detail="照片不能超过 5MB")
file_path.write_bytes(content)
photo_path = str(file_path)
calories_i = _parse_optional_int(calories, "卡路里")
protein_f = _parse_optional_float(protein, "蛋白质")
carbs_f = _parse_optional_float(carbs, "碳水")
fat_f = _parse_optional_float(fat, "脂肪")
if calories_i is None and description:
from ..core.calories import estimate_meal_calories
result = estimate_meal_calories(description)
calories_i = result["total_calories"]
protein_f = result["total_protein"]
carbs_f = result["total_carbs"]
fat_f = result["total_fat"]
else:
calories_i = calories_i or 0
meal = Meal(
date=record_date,
meal_type=meal_type,
description=description,
calories=calories_i,
protein=protein_f,
carbs=carbs_f,
fat=fat_f,
photo_path=photo_path,
food_items=None,
)
record_id = db.add_meal(meal, user_id=active_user.id)
return {"success": True, "id": record_id, "calories": calories_i}
@app.delete("/api/meal/{meal_id}")
async def delete_meal_api(meal_id: int):
"""删除饮食记录"""
try:
db.delete_meal(meal_id)
return {"success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/meal/recognize")
async def recognize_food(
text: Optional[str] = Form(None),
image: Optional[UploadFile] = File(None),
provider: str = Form("qwen"),
):
"""
智能食物识别
支持两种输入方式:
- text: 文字描述,如 "今天吃了一碗米饭、两个鸡蛋"
- image: 食物图片
provider: "qwen" | "deepseek" | "claude" | "local"
"""
if not text and not image:
raise HTTPException(status_code=400, detail="请提供文字描述或上传图片")
try:
if image:
# 图片识别
import tempfile
from ..vision.analyzer import get_analyzer
# 保存临时文件
suffix = Path(image.filename or "").suffix.lower() or ".jpg"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
content = await image.read()
if len(content) > 10 * 1024 * 1024:
raise HTTPException(status_code=400, detail="图片不能超过 10MB")
tmp.write(content)
tmp_path = Path(tmp.name)
try:
analyzer = get_analyzer(provider=provider)
if hasattr(analyzer, "analyze_image"):
result = analyzer.analyze_image(tmp_path)
else:
result = analyzer.analyze(tmp_path)
finally:
tmp_path.unlink(missing_ok=True)
else:
# 文字识别
if provider == "qwen":
from ..vision.providers.qwen import get_qwen_analyzer
analyzer = get_qwen_analyzer()
result = analyzer.analyze_text(text)
elif provider == "deepseek":
from ..vision.providers.deepseek import get_deepseek_analyzer
analyzer = get_deepseek_analyzer()
result = analyzer.analyze_text(text)
else:
# Claude 和 local 只用本地估算
from ..core.calories import estimate_meal_calories
result = estimate_meal_calories(text)
result["description"] = text
result["provider"] = provider
return {
"success": True,
"description": result.get("description", ""),
"total_calories": result.get("total_calories", 0),
"total_protein": result.get("total_protein", 0),
"total_carbs": result.get("total_carbs", 0),
"total_fat": result.get("total_fat", 0),
"items": result.get("items", []),
"provider": result.get("provider", provider),
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except Exception as e:
raise HTTPException(status_code=500, detail=f"识别失败: {str(e)}") from e
@app.post("/api/sleep")
async def add_sleep_api(data: SleepInput):
"""添加睡眠记录"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
try:
record_date = date.fromisoformat(data.date)
except ValueError as exc:
raise HTTPException(status_code=400, detail="日期格式应为 YYYY-MM-DD") from exc
bedtime = time.fromisoformat(data.bedtime) if data.bedtime else None
wake_time = time.fromisoformat(data.wake_time) if data.wake_time else None
sleep = Sleep(
date=record_date,
bedtime=bedtime,
wake_time=wake_time,
duration=data.duration,
quality=data.quality,
notes=data.notes,
source="web",
)
record_id = db.add_sleep(sleep, user_id=active_user.id)
return {"success": True, "id": record_id}
@app.delete("/api/sleep/{sleep_id}")
async def delete_sleep_api(sleep_id: int):
"""删除睡眠记录"""
try:
db.delete_sleep(sleep_id)
return {"success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/weight")
async def add_weight_api(data: WeightInput):
"""添加体重记录"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
try:
record_date = date.fromisoformat(data.date)
except ValueError as exc:
raise HTTPException(status_code=400, detail="日期格式应为 YYYY-MM-DD") from exc
weight = Weight(
date=record_date,
weight_kg=data.weight_kg,
body_fat_pct=data.body_fat_pct,
muscle_mass=data.muscle_mass,
notes=data.notes,
)
record_id = db.add_weight(weight, user_id=active_user.id)
return {"success": True, "id": record_id}
@app.delete("/api/weight/{weight_id}")
async def delete_weight_api(weight_id: int):
"""删除体重记录"""
try:
db.delete_weight(weight_id)
return {"success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/meals/nutrition")
async def get_meal_nutrition_stats(
days: int = Query(default=30, ge=1, le=365, description="统计天数"),
):
"""获取饮食营养统计"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
today = date.today()
start_date = today - timedelta(days=days - 1)
meals = db.get_meals(start_date=start_date, end_date=today, user_id=active_user.id)
total_calories = sum(m.calories for m in meals)
total_protein = sum(m.protein or 0 for m in meals)
total_carbs = sum(m.carbs or 0 for m in meals)
total_fat = sum(m.fat or 0 for m in meals)
meal_type_counts: dict[str, int] = {}
meal_type_calories: dict[str, int] = {}
for m in meals:
meal_type_counts[m.meal_type] = meal_type_counts.get(m.meal_type, 0) + 1
meal_type_calories[m.meal_type] = meal_type_calories.get(m.meal_type, 0) + m.calories
recent_days = 7 if days >= 7 else days
recent_start = today - timedelta(days=recent_days - 1)
daily_stats = []
for i in range(recent_days):
day = recent_start + timedelta(days=i)
day_meals = [m for m in meals if m.date == day]
daily_stats.append({
"date": day.isoformat(),
"calories": sum(m.calories for m in day_meals),
"protein": sum(m.protein or 0 for m in day_meals),
"carbs": sum(m.carbs or 0 for m in day_meals),
"fat": sum(m.fat or 0 for m in day_meals),
})
return {
"days": days,
"total_calories": total_calories,
"macros": {
"protein": round(total_protein, 1),
"carbs": round(total_carbs, 1),
"fat": round(total_fat, 1),
},
"meal_types": {
"counts": meal_type_counts,
"calories": meal_type_calories,
},
"daily_stats": daily_stats,
}
@app.get("/api/sleep", response_model=list[SleepResponse])
async def get_sleep_records(
days: int = Query(default=30, ge=1, le=365, description="查询天数"),
):
"""获取睡眠记录"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
end_date = date.today()
start_date = end_date - timedelta(days=days)
records = db.get_sleep_records(start_date=start_date, end_date=end_date, user_id=active_user.id)
return [
SleepResponse(
id=s.id,
date=s.date.isoformat(),
bedtime=s.bedtime.isoformat() if s.bedtime else None,
wake_time=s.wake_time.isoformat() if s.wake_time else None,
duration=s.duration,
quality=s.quality,
deep_sleep_mins=s.deep_sleep_mins,
source=s.source,
notes=s.notes,
)
for s in records
]
@app.get("/api/weight", response_model=list[WeightResponse])
async def get_weight_records(
days: int = Query(default=90, ge=1, le=365, description="查询天数"),
):
"""获取体重记录"""
# 获取激活用户
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="没有激活的用户")
end_date = date.today()
start_date = end_date - timedelta(days=days)
records = db.get_weight_records(start_date=start_date, end_date=end_date, user_id=active_user.id)
return [
WeightResponse(
id=w.id,
date=w.date.isoformat(),
weight_kg=w.weight_kg,
body_fat_pct=w.body_fat_pct,
muscle_mass=w.muscle_mass,
notes=w.notes,
)
for w in records
]
@app.get("/api/weight/goal")
async def get_weight_goal():
"""获取目标体重(基于用户配置推断)"""
active_user = db.get_active_user()
user_id = active_user.id if active_user else 1
config = db.get_config(user_id)
if not config.weight:
return {"goal_weight": None}
if config.goal == "lose":
goal_weight = round(config.weight * 0.95, 1)
elif config.goal == "gain":
goal_weight = round(config.weight * 1.05, 1)
else:
goal_weight = round(config.weight, 1)
return {"goal_weight": goal_weight}
# ===== 阅读 API =====
@app.get("/reading")
async def reading_page():
"""阅读页面"""
return HTMLResponse(content=get_reading_page_html(), status_code=200)
@app.get("/api/reading", response_model=list[ReadingResponse])
async def get_readings(days: int = 30):
"""获取阅读记录"""
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="请先设置活跃用户")
readings = db.get_readings(user_id=active_user.id, days=days)
return [
ReadingResponse(
id=r.id,
user_id=r.user_id,
date=r.date.isoformat(),
title=r.title,
author=r.author,
cover_url=r.cover_url,
duration=r.duration,
mood=r.mood,
notes=r.notes,
)
for r in readings
]
@app.post("/api/reading", response_model=ReadingResponse)
async def add_reading(reading_input: ReadingInput):
"""添加阅读记录"""
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="请先设置活跃用户")
from datetime import date as dt_date
reading_date = dt_date.fromisoformat(reading_input.date) if reading_input.date else dt_date.today()
reading = Reading(
date=reading_date,
title=reading_input.title,
author=reading_input.author,
cover_url=reading_input.cover_url,
duration=reading_input.duration,
mood=reading_input.mood,
notes=reading_input.notes,
)
reading_id = db.add_reading(reading, user_id=active_user.id)
created = db.get_reading(reading_id)
return ReadingResponse(
id=created.id,
user_id=created.user_id,
date=created.date.isoformat(),
title=created.title,
author=created.author,
cover_url=created.cover_url,
duration=created.duration,
mood=created.mood,
notes=created.notes,
)
@app.delete("/api/reading/{reading_id}")
async def delete_reading(reading_id: int):
"""删除阅读记录"""
reading = db.get_reading(reading_id)
if not reading:
raise HTTPException(status_code=404, detail="阅读记录不存在")
db.delete_reading(reading_id)
return {"message": "阅读记录已删除"}
@app.get("/api/reading/today")
async def get_today_reading():
"""获取今日阅读摘要"""
active_user = db.get_active_user()
if not active_user:
return {"duration": 0, "book": None, "mood": None}
return db.get_today_reading(user_id=active_user.id)
@app.get("/api/reading/stats")
async def get_reading_stats(days: int = 30):
"""获取阅读统计"""
active_user = db.get_active_user()
if not active_user:
raise HTTPException(status_code=400, detail="请先设置活跃用户")
return db.get_reading_stats(user_id=active_user.id, days=days)
@app.get("/api/books/search")
async def search_books(q: str):
"""搜索书籍(OpenLibrary API)"""
import urllib.request
import urllib.parse
import json
import ssl
if not q or len(q.strip()) < 2:
return {"books": []}
try:
encoded_q = urllib.parse.quote(q)
url = f"https://openlibrary.org/search.json?q={encoded_q}&limit=5"
# 创建不验证 SSL 的上下文
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url, headers={"User-Agent": "Vitals/1.0"})
with urllib.request.urlopen(req, timeout=5, context=ssl_context) as response:
data = json.loads(response.read().decode())
books = []
for doc in data.get("docs", [])[:5]:
cover_id = doc.get("cover_i")
cover_url = f"https://covers.openlibrary.org/b/id/{cover_id}-M.jpg" if cover_id else None
books.append({
"title": doc.get("title", ""),
"author": doc.get("author_name", [""])[0] if doc.get("author_name") else None,
"cover_url": cover_url,
"isbn": doc.get("isbn", [""])[0] if doc.get("isbn") else None,
})
return {"books": books}
except Exception as e:
return {"books": [], "error": str(e)}
# ===== 用户管理 API =====
@app.get("/api/users", response_model=list[UserResponse])
async def get_users(current_user: User = Depends(require_user)):
"""获取用户列表(非管理员仅返回自己)"""
if current_user.is_admin:
users = db.get_users()
else:
users = [db.get_user(current_user.id)]
return [
UserResponse(
id=u.id,
name=u.name,
created_at=u.created_at.isoformat(),
is_active=u.is_active,
gender=u.gender,
height_cm=u.height_cm,
weight_kg=u.weight_kg,
age=u.age,
bmi=u.bmi,
bmi_status=u.bmi_status,
)
for u in users if u
]
@app.get("/api/users/active", response_model=UserResponse)
async def get_active_user():
"""获取当前激活的用户"""
user = db.get_active_user()
if not user:
raise HTTPException(status_code=404, detail="没有激活的用户")
return UserResponse(
id=user.id,
name=user.name,
created_at=user.created_at.isoformat(),
is_active=user.is_active,
gender=user.gender,
height_cm=user.height_cm,
weight_kg=user.weight_kg,
age=user.age,
bmi=user.bmi,
bmi_status=user.bmi_status,
)
@app.get("/api/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, current_user: User = Depends(require_user)):
"""获取指定用户(非管理员只能查自己)"""
if not current_user.is_admin and user_id != current_user.id:
raise HTTPException(status_code=403, detail="无权查看其他用户")
user = db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return UserResponse(
id=user.id,
name=user.name,
created_at=user.created_at.isoformat(),
is_active=user.is_active,
gender=user.gender,
height_cm=user.height_cm,
weight_kg=user.weight_kg,
age=user.age,
bmi=user.bmi,
bmi_status=user.bmi_status,
)
@app.post("/api/users", response_model=UserResponse)
async def create_user(user_input: UserInput):
"""创建新用户"""
user = User(
name=user_input.name,
gender=user_input.gender,
height_cm=user_input.height_cm,
weight_kg=user_input.weight_kg,
age=user_input.age,
)
user_id = db.add_user(user)
created_user = db.get_user(user_id)
return UserResponse(
id=created_user.id,
name=created_user.name,
created_at=created_user.created_at.isoformat(),
is_active=created_user.is_active,
gender=created_user.gender,
height_cm=created_user.height_cm,
weight_kg=created_user.weight_kg,
age=created_user.age,
bmi=created_user.bmi,
bmi_status=created_user.bmi_status,
)
@app.put("/api/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_input: UserInput):
"""更新用户信息"""
user = db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
user.name = user_input.name
user.gender = user_input.gender
user.height_cm = user_input.height_cm
user.weight_kg = user_input.weight_kg
user.age = user_input.age
db.update_user(user)
updated_user = db.get_user(user_id)
return UserResponse(
id=updated_user.id,
name=updated_user.name,
created_at=updated_user.created_at.isoformat(),
is_active=updated_user.is_active,
gender=updated_user.gender,
height_cm=updated_user.height_cm,
weight_kg=updated_user.weight_kg,
age=updated_user.age,
bmi=updated_user.bmi,
bmi_status=updated_user.bmi_status,
)
@app.delete("/api/users/{user_id}")
async def delete_user(user_id: int):
"""删除用户"""
user = db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
if user.is_active:
raise HTTPException(status_code=400, detail="无法删除激活中的用户,请先切换到其他用户")
db.delete_user(user_id)
return {"message": "用户已删除"}
@app.post("/api/users/{user_id}/activate")
async def activate_user(user_id: int):
"""设置激活用户"""
user = db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
db.set_active_user(user_id)
return {"message": f"已切换到用户: {user.name}"}
# ===== 数据清除 API =====
@app.post("/api/data/preview-delete")
async def preview_delete_data(request: DataClearInput):
"""预览将要删除的数据量"""
# 验证用户存在
user = db.get_user(request.user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 转换日期字符串
date_from = date.fromisoformat(request.date_from) if request.date_from else None
date_to = date.fromisoformat(request.date_to) if request.date_to else None
# 获取预览数据
counts = db.preview_delete(
user_id=request.user_id,
mode=request.mode,
date_from=date_from,
date_to=date_to,
data_types=request.data_types,
)
return counts
@app.post("/api/data/clear")
async def clear_data(request: DataClearInput):
"""清除数据"""
# 验证用户存在
user = db.get_user(request.user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 转换日期字符串
date_from = date.fromisoformat(request.date_from) if request.date_from else None
date_to = date.fromisoformat(request.date_to) if request.date_to else None
# 执行清除
db.clear_data(
user_id=request.user_id,
mode=request.mode,
date_from=date_from,
date_to=date_to,
data_types=request.data_types,
)
return {"message": "数据已清除"}
# ===== 移动端 H5 通用样式和组件 =====
def get_common_mobile_styles() -> str:
"""生成移动端通用 CSS 样式"""
return """
/* 移动端基础 */
* {
-webkit-tap-highlight-color: transparent;
touch-action: manipulation;
}
html {
font-size: 16px;
-webkit-text-size-adjust: 100%;
}
/* 响应式断点 */
@media (max-width: 768px) {
html { font-size: 14px; }
.container {
padding: 0 16px;
max-width: 100%;
}
/* 隐藏桌面导航 */
.desktop-nav, .nav { display: none !important; }
/* 显示移动端底部导航 */
.mobile-nav { display: flex !important; }
/* 内容区域留出底部导航空间 */
.main-content, body {
padding-bottom: 80px;
}
}
@media (min-width: 769px) {
.mobile-nav { display: none !important; }
.desktop-nav, .nav { display: flex; }
}
/* 触摸目标 */
.touch-target {
min-height: 44px;
min-width: 44px;
display: flex;
align-items: center;
justify-content: center;
}
/* 安全区域 */
.safe-area-bottom {
padding-bottom: env(safe-area-inset-bottom, 0);
}
"""
def get_mobile_nav_html(active_page: str = "", is_admin: bool = False) -> str:
"""生成移动端底部导航栏"""
def nav_item(href: str, icon: str, label: str, page: str) -> str:
active_class = "active" if active_page == page else ""
return f'''
'''
# SVG 图标路径
icons = {
"home": '
本月运动统计与趋势
| 日期 | 类型 | 时长 | 卡路里 | 距离 | 操作 |
|---|---|---|---|---|---|
| 加载中... | |||||
卡路里与营养摄入趋势
| 日期 | 餐次 | 食物 | 卡路里 | 蛋白质 | 操作 |
|---|---|---|---|---|---|
| 加载中... | |||||
睡眠时长与质量趋势
| 日期 | 时长 | 质量 | 入睡 | 起床 | 操作 |
|---|---|---|---|---|---|
| 加载中... | |||||
体重与体脂变化
| 日期 | 体重 | 体脂率 | 肌肉量 | 操作 |
|---|---|---|---|---|
| 加载中... | ||||
周报 / 月报导出与查看
报告支持一键生成并下载 PDF/PNG:
周报 PDF/PNG 与 月报 PDF/PNG
加载中...
管理用户档案和数据
编辑当前激活用户的基本信息,BMI 将根据身高和体重自动计算
当前登录账户管理