diff --git a/src/vitals/core/database.py b/src/vitals/core/database.py index 57d5d89..d722e5e 100644 --- a/src/vitals/core/database.py +++ b/src/vitals/core/database.py @@ -188,11 +188,10 @@ def init_db(): def add_exercise(exercise: Exercise, user_id: int = 1) -> int: """添加运动记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute(""" INSERT INTO exercise (user_id, date, type, duration, calories, distance, heart_rate_avg, source, raw_data, notes) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( user_id, exercise.date.isoformat(), @@ -210,19 +209,18 @@ def add_exercise(exercise: Exercise, user_id: int = 1) -> int: def get_exercises(start_date: Optional[date] = None, end_date: Optional[date] = None, user_id: Optional[int] = None) -> list[Exercise]: """查询运动记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): query = "SELECT * FROM exercise WHERE 1=1" params = [] if user_id: - query += " AND user_id = ?" + query += " AND user_id = %s" params.append(user_id) if start_date: - query += " AND date >= ?" + query += " AND date >= %s" params.append(start_date.isoformat()) if end_date: - query += " AND date <= ?" + query += " AND date <= %s" params.append(end_date.isoformat()) query += " ORDER BY date DESC" @@ -247,20 +245,18 @@ def get_exercises(start_date: Optional[date] = None, end_date: Optional[date] = def delete_exercise(exercise_id: int): """删除运动记录""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM exercise WHERE id = ?", (exercise_id,)) + with get_connection() as (conn, cursor): + cursor.execute("DELETE FROM exercise WHERE id = %s", (exercise_id,)) # ===== 饮食记录 ===== def add_meal(meal: Meal, user_id: int = 1) -> int: """添加饮食记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute(""" INSERT INTO meal (user_id, date, meal_type, description, calories, protein, carbs, fat, photo_path, food_items) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( user_id, meal.date.isoformat(), @@ -278,19 +274,18 @@ def add_meal(meal: Meal, user_id: int = 1) -> int: def get_meals(start_date: Optional[date] = None, end_date: Optional[date] = None, user_id: Optional[int] = None) -> list[Meal]: """查询饮食记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): query = "SELECT * FROM meal WHERE 1=1" params = [] if user_id: - query += " AND user_id = ?" + query += " AND user_id = %s" params.append(user_id) if start_date: - query += " AND date >= ?" + query += " AND date >= %s" params.append(start_date.isoformat()) if end_date: - query += " AND date <= ?" + query += " AND date <= %s" params.append(end_date.isoformat()) query += " ORDER BY date DESC, meal_type" @@ -315,20 +310,18 @@ def get_meals(start_date: Optional[date] = None, end_date: Optional[date] = None def delete_meal(meal_id: int): """删除饮食记录""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM meal WHERE id = ?", (meal_id,)) + with get_connection() as (conn, cursor): + cursor.execute("DELETE FROM meal WHERE id = %s", (meal_id,)) # ===== 睡眠记录 ===== def add_sleep(sleep_record: Sleep, user_id: int = 1) -> int: """添加睡眠记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute(""" INSERT INTO sleep (user_id, date, bedtime, wake_time, duration, quality, deep_sleep_mins, source, notes) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( user_id, sleep_record.date.isoformat(), @@ -345,19 +338,18 @@ def add_sleep(sleep_record: Sleep, user_id: int = 1) -> int: def get_sleep_records(start_date: Optional[date] = None, end_date: Optional[date] = None, user_id: Optional[int] = None) -> list[Sleep]: """查询睡眠记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): query = "SELECT * FROM sleep WHERE 1=1" params = [] if user_id: - query += " AND user_id = ?" + query += " AND user_id = %s" params.append(user_id) if start_date: - query += " AND date >= ?" + query += " AND date >= %s" params.append(start_date.isoformat()) if end_date: - query += " AND date <= ?" + query += " AND date <= %s" params.append(end_date.isoformat()) query += " ORDER BY date DESC" @@ -381,20 +373,18 @@ def get_sleep_records(start_date: Optional[date] = None, end_date: Optional[date def delete_sleep(sleep_id: int): """删除睡眠记录""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM sleep WHERE id = ?", (sleep_id,)) + with get_connection() as (conn, cursor): + cursor.execute("DELETE FROM sleep WHERE id = %s", (sleep_id,)) # ===== 体重记录 ===== def add_weight(weight_record: Weight, user_id: int = 1) -> int: """添加体重记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute(""" INSERT INTO weight (user_id, date, weight_kg, body_fat_pct, muscle_mass, notes) - VALUES (?, ?, ?, ?, ?, ?) + VALUES (%s, %s, %s, %s, %s, %s) """, ( user_id, weight_record.date.isoformat(), @@ -408,19 +398,18 @@ def add_weight(weight_record: Weight, user_id: int = 1) -> int: def get_weight_records(start_date: Optional[date] = None, end_date: Optional[date] = None, user_id: Optional[int] = None) -> list[Weight]: """查询体重记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): query = "SELECT * FROM weight WHERE 1=1" params = [] if user_id: - query += " AND user_id = ?" + query += " AND user_id = %s" params.append(user_id) if start_date: - query += " AND date >= ?" + query += " AND date >= %s" params.append(start_date.isoformat()) if end_date: - query += " AND date <= ?" + query += " AND date <= %s" params.append(end_date.isoformat()) query += " ORDER BY date DESC" @@ -447,18 +436,16 @@ def get_latest_weight() -> Optional[Weight]: def delete_weight(weight_id: int): """删除体重记录""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM weight WHERE id = ?", (weight_id,)) + with get_connection() as (conn, cursor): + cursor.execute("DELETE FROM weight WHERE id = %s", (weight_id,)) # ===== 用户配置 ===== def get_config() -> UserConfig: """获取用户配置""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT key, value FROM config") + with get_connection() as (conn, cursor): + cursor.execute("SELECT `key`, value FROM config") rows = cursor.fetchall() config_dict = {row["key"]: row["value"] for row in rows} @@ -475,8 +462,7 @@ def get_config() -> UserConfig: def save_config(config: UserConfig): """保存用户配置""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): config_dict = { "age": str(config.age) if config.age else None, "gender": config.gender, @@ -489,7 +475,7 @@ def save_config(config: UserConfig): for key, value in config_dict.items(): if value is not None: cursor.execute(""" - INSERT OR REPLACE INTO config (key, value) VALUES (?, ?) + REPLACE INTO config (`key`, value) VALUES (%s, %s) """, (key, value)) @@ -497,11 +483,10 @@ def save_config(config: UserConfig): def add_user(user: User) -> int: """添加用户""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute(""" INSERT INTO users (name, created_at, is_active, gender, height_cm, weight_kg, age, password_hash, email, is_admin, is_disabled) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( user.name, user.created_at.isoformat(), @@ -539,38 +524,34 @@ def _row_to_user(row) -> User: def get_users() -> list[User]: """获取所有用户""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute("SELECT * FROM users ORDER BY id") return [_row_to_user(row) for row in cursor.fetchall()] def get_user(user_id: int) -> Optional[User]: """按 ID 获取用户""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) + with get_connection() as (conn, cursor): + cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) row = cursor.fetchone() return _row_to_user(row) if row else None def get_user_by_name(name: str) -> Optional[User]: """按用户名获取用户(用于登录)""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM users WHERE name = ?", (name,)) + with get_connection() as (conn, cursor): + cursor.execute("SELECT * FROM users WHERE name = %s", (name,)) row = cursor.fetchone() return _row_to_user(row) if row else None def update_user(user: User): """更新用户""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute(""" - UPDATE users SET name = ?, is_active = ?, gender = ?, height_cm = ?, weight_kg = ?, age = ?, - password_hash = ?, email = ?, is_admin = ?, is_disabled = ? - WHERE id = ? + UPDATE users SET name = %s, is_active = %s, gender = %s, height_cm = %s, weight_kg = %s, age = %s, + password_hash = %s, email = %s, is_admin = %s, is_disabled = %s + WHERE id = %s """, ( user.name, 1 if user.is_active else 0, user.gender, user.height_cm, user.weight_kg, user.age, user.password_hash, user.email, 1 if user.is_admin else 0, 1 if user.is_disabled else 0, @@ -580,23 +561,20 @@ def update_user(user: User): def delete_user(user_id: int): """删除用户""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM users WHERE id = ?", (user_id,)) + with get_connection() as (conn, cursor): + cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) def set_active_user(user_id: int): """设置激活用户(同时取消其他用户的激活状态)""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute("UPDATE users SET is_active = 0") - cursor.execute("UPDATE users SET is_active = 1 WHERE id = ?", (user_id,)) + cursor.execute("UPDATE users SET is_active = 1 WHERE id = %s", (user_id,)) def get_active_user() -> Optional[User]: """获取当前激活的用户""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute("SELECT * FROM users WHERE is_active = 1") row = cursor.fetchone() return _row_to_user(row) if row else None @@ -604,9 +582,7 @@ def get_active_user() -> Optional[User]: def ensure_default_user(): """确保存在默认用户,并将无 user_id 的数据关联到默认用户""" - with get_connection() as conn: - cursor = conn.cursor() - + with get_connection() as (conn, cursor): # 检查是否已有用户 cursor.execute("SELECT COUNT(*) as count FROM users") count = cursor.fetchone()["count"] @@ -615,7 +591,7 @@ def ensure_default_user(): # 创建默认用户 cursor.execute(""" INSERT INTO users (name, created_at, is_active) - VALUES (?, ?, 1) + VALUES (%s, %s, 1) """, ("默认用户", datetime.now().isoformat())) # 获取激活用户(如果没有则设置第一个用户为激活) @@ -625,23 +601,16 @@ def ensure_default_user(): cursor.execute("SELECT id FROM users ORDER BY id LIMIT 1") first = cursor.fetchone() if first: - cursor.execute("UPDATE users SET is_active = 1 WHERE id = ?", (first["id"],)) + cursor.execute("UPDATE users SET is_active = 1 WHERE id = %s", (first["id"],)) active = first if active: default_user_id = active["id"] - # 迁移现有数据(将 user_id 为 NULL 或不存在的记录关联到默认用户) + # 迁移现有数据(将 user_id 为 NULL 的记录关联到默认用户) for table in ["exercise", "meal", "sleep", "weight"]: - # 检查表是否有 user_id 列 - cursor.execute(f"PRAGMA table_info({table})") - columns = [col["name"] for col in cursor.fetchall()] - if "user_id" not in columns: - # 添加 user_id 列 - cursor.execute(f"ALTER TABLE {table} ADD COLUMN user_id INTEGER DEFAULT 1") - # 更新 NULL 的 user_id - cursor.execute(f"UPDATE {table} SET user_id = ? WHERE user_id IS NULL", (default_user_id,)) + cursor.execute(f"UPDATE {table} SET user_id = %s WHERE user_id IS NULL", (default_user_id,)) def migrate_auth_fields(): @@ -693,18 +662,17 @@ def preview_delete( counts = {} total = 0 - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): for table in ["exercise", "meal", "sleep", "weight"]: if mode == "type" and data_types and table not in data_types: counts[table] = 0 continue - query = f"SELECT COUNT(*) as count FROM {table} WHERE user_id = ?" + query = f"SELECT COUNT(*) as count FROM {table} WHERE user_id = %s" params = [user_id] if mode == "range" and date_from and date_to: - query += " AND date >= ? AND date <= ?" + query += " AND date >= %s AND date <= %s" params.extend([date_from.isoformat(), date_to.isoformat()]) cursor.execute(query, params) @@ -728,14 +696,13 @@ def clear_data( if mode == "type" and data_types: tables = [t for t in tables if t in data_types] - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): for table in tables: - query = f"DELETE FROM {table} WHERE user_id = ?" + query = f"DELETE FROM {table} WHERE user_id = %s" params = [user_id] if mode == "range" and date_from and date_to: - query += " AND date >= ? AND date <= ?" + query += " AND date >= %s AND date <= %s" params.extend([date_from.isoformat(), date_to.isoformat()]) cursor.execute(query, params) @@ -745,11 +712,10 @@ def clear_data( def add_reading(reading: Reading, user_id: int = 1) -> int: """添加阅读记录""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute(""" INSERT INTO reading (user_id, date, title, author, cover_url, duration, mood, notes) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, ( user_id, reading.date.isoformat(), @@ -770,19 +736,18 @@ def get_readings( days: Optional[int] = None, ) -> list[Reading]: """获取阅读记录""" - with get_connection() as conn: - cursor = conn.cursor() - query = "SELECT * FROM reading WHERE user_id = ?" + with get_connection() as (conn, cursor): + query = "SELECT * FROM reading WHERE user_id = %s" params: list = [user_id] if days: start_date = date.today() - timedelta(days=days) if start_date: - query += " AND date >= ?" + query += " AND date >= %s" params.append(start_date.isoformat()) if end_date: - query += " AND date <= ?" + query += " AND date <= %s" params.append(end_date.isoformat()) query += " ORDER BY date DESC, id DESC" @@ -806,9 +771,8 @@ def get_readings( def get_reading(reading_id: int) -> Optional[Reading]: """获取单条阅读记录""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM reading WHERE id = ?", (reading_id,)) + with get_connection() as (conn, cursor): + cursor.execute("SELECT * FROM reading WHERE id = %s", (reading_id,)) row = cursor.fetchone() if row: return Reading( @@ -827,22 +791,20 @@ def get_reading(reading_id: int) -> Optional[Reading]: def delete_reading(reading_id: int): """删除阅读记录""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM reading WHERE id = ?", (reading_id,)) + with get_connection() as (conn, cursor): + cursor.execute("DELETE FROM reading WHERE id = %s", (reading_id,)) def get_reading_stats(user_id: int = 1, days: int = 30) -> dict: """获取阅读统计""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): start_date = date.today() - timedelta(days=days) # 每日阅读时长 cursor.execute(""" SELECT date, SUM(duration) as total_duration FROM reading - WHERE user_id = ? AND date >= ? + WHERE user_id = %s AND date >= %s GROUP BY date ORDER BY date """, (user_id, start_date.isoformat())) @@ -852,7 +814,7 @@ def get_reading_stats(user_id: int = 1, days: int = 30) -> dict: cursor.execute(""" SELECT mood, COUNT(*) as count FROM reading - WHERE user_id = ? AND date >= ? AND mood IS NOT NULL + WHERE user_id = %s AND date >= %s AND mood IS NOT NULL GROUP BY mood """, (user_id, start_date.isoformat())) mood_distribution = {row["mood"]: row["count"] for row in cursor.fetchall()} @@ -864,7 +826,7 @@ def get_reading_stats(user_id: int = 1, days: int = 30) -> dict: COUNT(*) as reading_count, MAX(date) as last_read FROM reading - WHERE user_id = ? + WHERE user_id = %s GROUP BY title ORDER BY last_read DESC """, (user_id,)) @@ -884,7 +846,7 @@ def get_reading_stats(user_id: int = 1, days: int = 30) -> dict: cursor.execute(""" SELECT SUM(duration) as total, COUNT(DISTINCT title) as book_count FROM reading - WHERE user_id = ? AND date >= ? + WHERE user_id = %s AND date >= %s """, (user_id, start_date.isoformat())) totals = cursor.fetchone() @@ -899,8 +861,7 @@ def get_reading_stats(user_id: int = 1, days: int = 30) -> dict: def get_today_reading(user_id: int = 1) -> dict: """获取今日阅读摘要""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): today = date.today().isoformat() cursor.execute(""" @@ -908,7 +869,7 @@ def get_today_reading(user_id: int = 1) -> dict: MAX(title) as last_book, MAX(mood) as last_mood FROM reading - WHERE user_id = ? AND date = ? + WHERE user_id = %s AND date = %s """, (user_id, today)) row = cursor.fetchone() @@ -923,11 +884,10 @@ def get_today_reading(user_id: int = 1) -> dict: def add_invite(invite: Invite) -> int: """添加邀请码""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): cursor.execute(""" INSERT INTO invites (code, created_by, used_by, created_at, expires_at) - VALUES (?, ?, ?, ?, ?) + VALUES (%s, %s, %s, %s, %s) """, ( invite.code, invite.created_by, @@ -940,10 +900,9 @@ def add_invite(invite: Invite) -> int: def get_invites(created_by: Optional[int] = None) -> list[Invite]: """获取邀请码列表""" - with get_connection() as conn: - cursor = conn.cursor() + with get_connection() as (conn, cursor): if created_by: - cursor.execute("SELECT * FROM invites WHERE created_by = ? ORDER BY created_at DESC", (created_by,)) + cursor.execute("SELECT * FROM invites WHERE created_by = %s ORDER BY created_at DESC", (created_by,)) else: cursor.execute("SELECT * FROM invites ORDER BY created_at DESC") return [ @@ -961,9 +920,8 @@ def get_invites(created_by: Optional[int] = None) -> list[Invite]: def get_invite_by_code(code: str) -> Optional[Invite]: """按邀请码查询""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM invites WHERE code = ?", (code,)) + with get_connection() as (conn, cursor): + cursor.execute("SELECT * FROM invites WHERE code = %s", (code,)) row = cursor.fetchone() if row: return Invite( @@ -979,13 +937,11 @@ def get_invite_by_code(code: str) -> Optional[Invite]: def mark_invite_used(invite_id: int, user_id: int): """标记邀请码已使用""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("UPDATE invites SET used_by = ? WHERE id = ?", (user_id, invite_id)) + with get_connection() as (conn, cursor): + cursor.execute("UPDATE invites SET used_by = %s WHERE id = %s", (user_id, invite_id)) def delete_invite(invite_id: int): """删除邀请码""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM invites WHERE id = ?", (invite_id,)) + with get_connection() as (conn, cursor): + cursor.execute("DELETE FROM invites WHERE id = %s", (invite_id,))