refactor: 所有 SQL 查询改为 MySQL 语法

This commit is contained in:
2026-01-23 17:56:16 +08:00
parent 49abeaeff6
commit ef43b1bc57

View File

@@ -188,11 +188,10 @@ def init_db():
def add_exercise(exercise: Exercise, user_id: int = 1) -> int:
"""添加运动记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("""
INSERT INTO exercise (user_id, date, type, duration, calories, distance, heart_rate_avg, source, raw_data, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
user_id,
exercise.date.isoformat(),
@@ -210,19 +209,18 @@ def add_exercise(exercise: Exercise, user_id: int = 1) -> int:
def get_exercises(start_date: Optional[date] = None, end_date: Optional[date] = None, user_id: Optional[int] = None) -> list[Exercise]:
"""查询运动记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
query = "SELECT * FROM exercise WHERE 1=1"
params = []
if user_id:
query += " AND user_id = ?"
query += " AND user_id = %s"
params.append(user_id)
if start_date:
query += " AND date >= ?"
query += " AND date >= %s"
params.append(start_date.isoformat())
if end_date:
query += " AND date <= ?"
query += " AND date <= %s"
params.append(end_date.isoformat())
query += " ORDER BY date DESC"
@@ -247,20 +245,18 @@ def get_exercises(start_date: Optional[date] = None, end_date: Optional[date] =
def delete_exercise(exercise_id: int):
"""删除运动记录"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM exercise WHERE id = ?", (exercise_id,))
with get_connection() as (conn, cursor):
cursor.execute("DELETE FROM exercise WHERE id = %s", (exercise_id,))
# ===== 饮食记录 =====
def add_meal(meal: Meal, user_id: int = 1) -> int:
"""添加饮食记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("""
INSERT INTO meal (user_id, date, meal_type, description, calories, protein, carbs, fat, photo_path, food_items)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
user_id,
meal.date.isoformat(),
@@ -278,19 +274,18 @@ def add_meal(meal: Meal, user_id: int = 1) -> int:
def get_meals(start_date: Optional[date] = None, end_date: Optional[date] = None, user_id: Optional[int] = None) -> list[Meal]:
"""查询饮食记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
query = "SELECT * FROM meal WHERE 1=1"
params = []
if user_id:
query += " AND user_id = ?"
query += " AND user_id = %s"
params.append(user_id)
if start_date:
query += " AND date >= ?"
query += " AND date >= %s"
params.append(start_date.isoformat())
if end_date:
query += " AND date <= ?"
query += " AND date <= %s"
params.append(end_date.isoformat())
query += " ORDER BY date DESC, meal_type"
@@ -315,20 +310,18 @@ def get_meals(start_date: Optional[date] = None, end_date: Optional[date] = None
def delete_meal(meal_id: int):
"""删除饮食记录"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM meal WHERE id = ?", (meal_id,))
with get_connection() as (conn, cursor):
cursor.execute("DELETE FROM meal WHERE id = %s", (meal_id,))
# ===== 睡眠记录 =====
def add_sleep(sleep_record: Sleep, user_id: int = 1) -> int:
"""添加睡眠记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("""
INSERT INTO sleep (user_id, date, bedtime, wake_time, duration, quality, deep_sleep_mins, source, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
user_id,
sleep_record.date.isoformat(),
@@ -345,19 +338,18 @@ def add_sleep(sleep_record: Sleep, user_id: int = 1) -> int:
def get_sleep_records(start_date: Optional[date] = None, end_date: Optional[date] = None, user_id: Optional[int] = None) -> list[Sleep]:
"""查询睡眠记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
query = "SELECT * FROM sleep WHERE 1=1"
params = []
if user_id:
query += " AND user_id = ?"
query += " AND user_id = %s"
params.append(user_id)
if start_date:
query += " AND date >= ?"
query += " AND date >= %s"
params.append(start_date.isoformat())
if end_date:
query += " AND date <= ?"
query += " AND date <= %s"
params.append(end_date.isoformat())
query += " ORDER BY date DESC"
@@ -381,20 +373,18 @@ def get_sleep_records(start_date: Optional[date] = None, end_date: Optional[date
def delete_sleep(sleep_id: int):
"""删除睡眠记录"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM sleep WHERE id = ?", (sleep_id,))
with get_connection() as (conn, cursor):
cursor.execute("DELETE FROM sleep WHERE id = %s", (sleep_id,))
# ===== 体重记录 =====
def add_weight(weight_record: Weight, user_id: int = 1) -> int:
"""添加体重记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("""
INSERT INTO weight (user_id, date, weight_kg, body_fat_pct, muscle_mass, notes)
VALUES (?, ?, ?, ?, ?, ?)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
user_id,
weight_record.date.isoformat(),
@@ -408,19 +398,18 @@ def add_weight(weight_record: Weight, user_id: int = 1) -> int:
def get_weight_records(start_date: Optional[date] = None, end_date: Optional[date] = None, user_id: Optional[int] = None) -> list[Weight]:
"""查询体重记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
query = "SELECT * FROM weight WHERE 1=1"
params = []
if user_id:
query += " AND user_id = ?"
query += " AND user_id = %s"
params.append(user_id)
if start_date:
query += " AND date >= ?"
query += " AND date >= %s"
params.append(start_date.isoformat())
if end_date:
query += " AND date <= ?"
query += " AND date <= %s"
params.append(end_date.isoformat())
query += " ORDER BY date DESC"
@@ -447,18 +436,16 @@ def get_latest_weight() -> Optional[Weight]:
def delete_weight(weight_id: int):
"""删除体重记录"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM weight WHERE id = ?", (weight_id,))
with get_connection() as (conn, cursor):
cursor.execute("DELETE FROM weight WHERE id = %s", (weight_id,))
# ===== 用户配置 =====
def get_config() -> UserConfig:
"""获取用户配置"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT key, value FROM config")
with get_connection() as (conn, cursor):
cursor.execute("SELECT `key`, value FROM config")
rows = cursor.fetchall()
config_dict = {row["key"]: row["value"] for row in rows}
@@ -475,8 +462,7 @@ def get_config() -> UserConfig:
def save_config(config: UserConfig):
"""保存用户配置"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
config_dict = {
"age": str(config.age) if config.age else None,
"gender": config.gender,
@@ -489,7 +475,7 @@ def save_config(config: UserConfig):
for key, value in config_dict.items():
if value is not None:
cursor.execute("""
INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)
REPLACE INTO config (`key`, value) VALUES (%s, %s)
""", (key, value))
@@ -497,11 +483,10 @@ def save_config(config: UserConfig):
def add_user(user: User) -> int:
"""添加用户"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("""
INSERT INTO users (name, created_at, is_active, gender, height_cm, weight_kg, age, password_hash, email, is_admin, is_disabled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
user.name,
user.created_at.isoformat(),
@@ -539,38 +524,34 @@ def _row_to_user(row) -> User:
def get_users() -> list[User]:
"""获取所有用户"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("SELECT * FROM users ORDER BY id")
return [_row_to_user(row) for row in cursor.fetchall()]
def get_user(user_id: int) -> Optional[User]:
"""按 ID 获取用户"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
with get_connection() as (conn, cursor):
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
row = cursor.fetchone()
return _row_to_user(row) if row else None
def get_user_by_name(name: str) -> Optional[User]:
"""按用户名获取用户(用于登录)"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE name = ?", (name,))
with get_connection() as (conn, cursor):
cursor.execute("SELECT * FROM users WHERE name = %s", (name,))
row = cursor.fetchone()
return _row_to_user(row) if row else None
def update_user(user: User):
"""更新用户"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("""
UPDATE users SET name = ?, is_active = ?, gender = ?, height_cm = ?, weight_kg = ?, age = ?,
password_hash = ?, email = ?, is_admin = ?, is_disabled = ?
WHERE id = ?
UPDATE users SET name = %s, is_active = %s, gender = %s, height_cm = %s, weight_kg = %s, age = %s,
password_hash = %s, email = %s, is_admin = %s, is_disabled = %s
WHERE id = %s
""", (
user.name, 1 if user.is_active else 0, user.gender, user.height_cm, user.weight_kg, user.age,
user.password_hash, user.email, 1 if user.is_admin else 0, 1 if user.is_disabled else 0,
@@ -580,23 +561,20 @@ def update_user(user: User):
def delete_user(user_id: int):
"""删除用户"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
with get_connection() as (conn, cursor):
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
def set_active_user(user_id: int):
"""设置激活用户(同时取消其他用户的激活状态)"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("UPDATE users SET is_active = 0")
cursor.execute("UPDATE users SET is_active = 1 WHERE id = ?", (user_id,))
cursor.execute("UPDATE users SET is_active = 1 WHERE id = %s", (user_id,))
def get_active_user() -> Optional[User]:
"""获取当前激活的用户"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("SELECT * FROM users WHERE is_active = 1")
row = cursor.fetchone()
return _row_to_user(row) if row else None
@@ -604,9 +582,7 @@ def get_active_user() -> Optional[User]:
def ensure_default_user():
"""确保存在默认用户,并将无 user_id 的数据关联到默认用户"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
# 检查是否已有用户
cursor.execute("SELECT COUNT(*) as count FROM users")
count = cursor.fetchone()["count"]
@@ -615,7 +591,7 @@ def ensure_default_user():
# 创建默认用户
cursor.execute("""
INSERT INTO users (name, created_at, is_active)
VALUES (?, ?, 1)
VALUES (%s, %s, 1)
""", ("默认用户", datetime.now().isoformat()))
# 获取激活用户(如果没有则设置第一个用户为激活)
@@ -625,23 +601,16 @@ def ensure_default_user():
cursor.execute("SELECT id FROM users ORDER BY id LIMIT 1")
first = cursor.fetchone()
if first:
cursor.execute("UPDATE users SET is_active = 1 WHERE id = ?", (first["id"],))
cursor.execute("UPDATE users SET is_active = 1 WHERE id = %s", (first["id"],))
active = first
if active:
default_user_id = active["id"]
# 迁移现有数据(将 user_id 为 NULL 或不存在的记录关联到默认用户)
# 迁移现有数据(将 user_id 为 NULL 的记录关联到默认用户)
for table in ["exercise", "meal", "sleep", "weight"]:
# 检查表是否有 user_id 列
cursor.execute(f"PRAGMA table_info({table})")
columns = [col["name"] for col in cursor.fetchall()]
if "user_id" not in columns:
# 添加 user_id 列
cursor.execute(f"ALTER TABLE {table} ADD COLUMN user_id INTEGER DEFAULT 1")
# 更新 NULL 的 user_id
cursor.execute(f"UPDATE {table} SET user_id = ? WHERE user_id IS NULL", (default_user_id,))
cursor.execute(f"UPDATE {table} SET user_id = %s WHERE user_id IS NULL", (default_user_id,))
def migrate_auth_fields():
@@ -693,18 +662,17 @@ def preview_delete(
counts = {}
total = 0
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
for table in ["exercise", "meal", "sleep", "weight"]:
if mode == "type" and data_types and table not in data_types:
counts[table] = 0
continue
query = f"SELECT COUNT(*) as count FROM {table} WHERE user_id = ?"
query = f"SELECT COUNT(*) as count FROM {table} WHERE user_id = %s"
params = [user_id]
if mode == "range" and date_from and date_to:
query += " AND date >= ? AND date <= ?"
query += " AND date >= %s AND date <= %s"
params.extend([date_from.isoformat(), date_to.isoformat()])
cursor.execute(query, params)
@@ -728,14 +696,13 @@ def clear_data(
if mode == "type" and data_types:
tables = [t for t in tables if t in data_types]
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
for table in tables:
query = f"DELETE FROM {table} WHERE user_id = ?"
query = f"DELETE FROM {table} WHERE user_id = %s"
params = [user_id]
if mode == "range" and date_from and date_to:
query += " AND date >= ? AND date <= ?"
query += " AND date >= %s AND date <= %s"
params.extend([date_from.isoformat(), date_to.isoformat()])
cursor.execute(query, params)
@@ -745,11 +712,10 @@ def clear_data(
def add_reading(reading: Reading, user_id: int = 1) -> int:
"""添加阅读记录"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("""
INSERT INTO reading (user_id, date, title, author, cover_url, duration, mood, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
user_id,
reading.date.isoformat(),
@@ -770,19 +736,18 @@ def get_readings(
days: Optional[int] = None,
) -> list[Reading]:
"""获取阅读记录"""
with get_connection() as conn:
cursor = conn.cursor()
query = "SELECT * FROM reading WHERE user_id = ?"
with get_connection() as (conn, cursor):
query = "SELECT * FROM reading WHERE user_id = %s"
params: list = [user_id]
if days:
start_date = date.today() - timedelta(days=days)
if start_date:
query += " AND date >= ?"
query += " AND date >= %s"
params.append(start_date.isoformat())
if end_date:
query += " AND date <= ?"
query += " AND date <= %s"
params.append(end_date.isoformat())
query += " ORDER BY date DESC, id DESC"
@@ -806,9 +771,8 @@ def get_readings(
def get_reading(reading_id: int) -> Optional[Reading]:
"""获取单条阅读记录"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM reading WHERE id = ?", (reading_id,))
with get_connection() as (conn, cursor):
cursor.execute("SELECT * FROM reading WHERE id = %s", (reading_id,))
row = cursor.fetchone()
if row:
return Reading(
@@ -827,22 +791,20 @@ def get_reading(reading_id: int) -> Optional[Reading]:
def delete_reading(reading_id: int):
"""删除阅读记录"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM reading WHERE id = ?", (reading_id,))
with get_connection() as (conn, cursor):
cursor.execute("DELETE FROM reading WHERE id = %s", (reading_id,))
def get_reading_stats(user_id: int = 1, days: int = 30) -> dict:
"""获取阅读统计"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
start_date = date.today() - timedelta(days=days)
# 每日阅读时长
cursor.execute("""
SELECT date, SUM(duration) as total_duration
FROM reading
WHERE user_id = ? AND date >= ?
WHERE user_id = %s AND date >= %s
GROUP BY date
ORDER BY date
""", (user_id, start_date.isoformat()))
@@ -852,7 +814,7 @@ def get_reading_stats(user_id: int = 1, days: int = 30) -> dict:
cursor.execute("""
SELECT mood, COUNT(*) as count
FROM reading
WHERE user_id = ? AND date >= ? AND mood IS NOT NULL
WHERE user_id = %s AND date >= %s AND mood IS NOT NULL
GROUP BY mood
""", (user_id, start_date.isoformat()))
mood_distribution = {row["mood"]: row["count"] for row in cursor.fetchall()}
@@ -864,7 +826,7 @@ def get_reading_stats(user_id: int = 1, days: int = 30) -> dict:
COUNT(*) as reading_count,
MAX(date) as last_read
FROM reading
WHERE user_id = ?
WHERE user_id = %s
GROUP BY title
ORDER BY last_read DESC
""", (user_id,))
@@ -884,7 +846,7 @@ def get_reading_stats(user_id: int = 1, days: int = 30) -> dict:
cursor.execute("""
SELECT SUM(duration) as total, COUNT(DISTINCT title) as book_count
FROM reading
WHERE user_id = ? AND date >= ?
WHERE user_id = %s AND date >= %s
""", (user_id, start_date.isoformat()))
totals = cursor.fetchone()
@@ -899,8 +861,7 @@ def get_reading_stats(user_id: int = 1, days: int = 30) -> dict:
def get_today_reading(user_id: int = 1) -> dict:
"""获取今日阅读摘要"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
today = date.today().isoformat()
cursor.execute("""
@@ -908,7 +869,7 @@ def get_today_reading(user_id: int = 1) -> dict:
MAX(title) as last_book,
MAX(mood) as last_mood
FROM reading
WHERE user_id = ? AND date = ?
WHERE user_id = %s AND date = %s
""", (user_id, today))
row = cursor.fetchone()
@@ -923,11 +884,10 @@ def get_today_reading(user_id: int = 1) -> dict:
def add_invite(invite: Invite) -> int:
"""添加邀请码"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
cursor.execute("""
INSERT INTO invites (code, created_by, used_by, created_at, expires_at)
VALUES (?, ?, ?, ?, ?)
VALUES (%s, %s, %s, %s, %s)
""", (
invite.code,
invite.created_by,
@@ -940,10 +900,9 @@ def add_invite(invite: Invite) -> int:
def get_invites(created_by: Optional[int] = None) -> list[Invite]:
"""获取邀请码列表"""
with get_connection() as conn:
cursor = conn.cursor()
with get_connection() as (conn, cursor):
if created_by:
cursor.execute("SELECT * FROM invites WHERE created_by = ? ORDER BY created_at DESC", (created_by,))
cursor.execute("SELECT * FROM invites WHERE created_by = %s ORDER BY created_at DESC", (created_by,))
else:
cursor.execute("SELECT * FROM invites ORDER BY created_at DESC")
return [
@@ -961,9 +920,8 @@ def get_invites(created_by: Optional[int] = None) -> list[Invite]:
def get_invite_by_code(code: str) -> Optional[Invite]:
"""按邀请码查询"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM invites WHERE code = ?", (code,))
with get_connection() as (conn, cursor):
cursor.execute("SELECT * FROM invites WHERE code = %s", (code,))
row = cursor.fetchone()
if row:
return Invite(
@@ -979,13 +937,11 @@ def get_invite_by_code(code: str) -> Optional[Invite]:
def mark_invite_used(invite_id: int, user_id: int):
"""标记邀请码已使用"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE invites SET used_by = ? WHERE id = ?", (user_id, invite_id))
with get_connection() as (conn, cursor):
cursor.execute("UPDATE invites SET used_by = %s WHERE id = %s", (user_id, invite_id))
def delete_invite(invite_id: int):
"""删除邀请码"""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM invites WHERE id = ?", (invite_id,))
with get_connection() as (conn, cursor):
cursor.execute("DELETE FROM invites WHERE id = %s", (invite_id,))