fix: MySQL 类型兼容性修复

- 添加 _parse_datetime, _parse_date, _parse_time 辅助函数
- 处理 MySQL 返回原生 datetime/date/time 类型而非字符串的情况
- 修改默认绑定地址为 0.0.0.0 以支持局域网访问

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-23 18:36:02 +08:00
parent 5860dccd60
commit 733d737413
2 changed files with 41 additions and 14 deletions

View File

@@ -228,7 +228,7 @@ def get_exercises(start_date: Optional[date] = None, end_date: Optional[date] =
return [
Exercise(
id=row["id"],
date=date.fromisoformat(row["date"]),
date=_parse_date(row["date"]),
type=row["type"],
duration=row["duration"],
calories=row["calories"],
@@ -293,7 +293,7 @@ def get_meals(start_date: Optional[date] = None, end_date: Optional[date] = None
return [
Meal(
id=row["id"],
date=date.fromisoformat(row["date"]),
date=_parse_date(row["date"]),
meal_type=row["meal_type"],
description=row["description"],
calories=row["calories"],
@@ -357,9 +357,9 @@ def get_sleep_records(start_date: Optional[date] = None, end_date: Optional[date
return [
Sleep(
id=row["id"],
date=date.fromisoformat(row["date"]),
bedtime=time.fromisoformat(row["bedtime"]) if row["bedtime"] else None,
wake_time=time.fromisoformat(row["wake_time"]) if row["wake_time"] else None,
date=_parse_date(row["date"]),
bedtime=_parse_time(row["bedtime"]),
wake_time=_parse_time(row["wake_time"]),
duration=row["duration"],
quality=row["quality"],
deep_sleep_mins=row["deep_sleep_mins"],
@@ -417,7 +417,7 @@ def get_weight_records(start_date: Optional[date] = None, end_date: Optional[dat
return [
Weight(
id=row["id"],
date=date.fromisoformat(row["date"]),
date=_parse_date(row["date"]),
weight_kg=row["weight_kg"],
body_fat_pct=row["body_fat_pct"],
muscle_mass=row["muscle_mass"],
@@ -502,13 +502,40 @@ def add_user(user: User) -> int:
return cursor.lastrowid
def _parse_datetime(value) -> datetime:
"""将字符串或 datetime 转换为 datetime 对象"""
if value is None:
return None
if isinstance(value, datetime):
return value
return datetime.fromisoformat(value)
def _parse_date(value) -> date:
"""将字符串或 date 转换为 date 对象"""
if value is None:
return None
if isinstance(value, date):
return value
return date.fromisoformat(value)
def _parse_time(value) -> time:
"""将字符串或 time 转换为 time 对象"""
if value is None:
return None
if isinstance(value, time):
return value
return time.fromisoformat(value)
def _row_to_user(row) -> User:
"""将数据库行转换为 User 对象"""
keys = row.keys()
return User(
id=row["id"],
name=row["name"],
created_at=datetime.fromisoformat(row["created_at"]),
created_at=_parse_datetime(row["created_at"]),
is_active=bool(row["is_active"]),
gender=row["gender"] if "gender" in keys else None,
height_cm=row["height_cm"] if "height_cm" in keys else None,
@@ -740,7 +767,7 @@ def get_readings(
Reading(
id=row["id"],
user_id=row["user_id"],
date=date.fromisoformat(row["date"]),
date=_parse_date(row["date"]),
title=row["title"],
author=row["author"],
cover_url=row["cover_url"],
@@ -761,7 +788,7 @@ def get_reading(reading_id: int) -> Optional[Reading]:
return Reading(
id=row["id"],
user_id=row["user_id"],
date=date.fromisoformat(row["date"]),
date=_parse_date(row["date"]),
title=row["title"],
author=row["author"],
cover_url=row["cover_url"],
@@ -894,8 +921,8 @@ def get_invites(created_by: Optional[int] = None) -> list[Invite]:
code=row["code"],
created_by=row["created_by"],
used_by=row["used_by"],
created_at=datetime.fromisoformat(row["created_at"]),
expires_at=datetime.fromisoformat(row["expires_at"]) if row["expires_at"] else None,
created_at=_parse_datetime(row["created_at"]),
expires_at=_parse_datetime(row["expires_at"]),
)
for row in cursor.fetchall()
]
@@ -912,8 +939,8 @@ def get_invite_by_code(code: str) -> Optional[Invite]:
code=row["code"],
created_by=row["created_by"],
used_by=row["used_by"],
created_at=datetime.fromisoformat(row["created_at"]),
expires_at=datetime.fromisoformat(row["expires_at"]) if row["expires_at"] else None,
created_at=_parse_datetime(row["created_at"]),
expires_at=_parse_datetime(row["expires_at"]),
)
return None

View File

@@ -8697,7 +8697,7 @@ def _export_report_file(report, format: str, prefix: str, exporter):
)
def run_server(host: str = "127.0.0.1", port: int = 8080):
def run_server(host: str = "0.0.0.0", port: int = 8080):
"""启动 Web 服务器"""
import uvicorn
uvicorn.run(app, host=host, port=port)