""" SQLite 数据库模块 边缘AI推理服务的本地数据存储 特性: - WAL 模式(Write-Ahead Logging)提升写入性能 - 异步写入策略 - 滚动清理机制(保留7天数据) """ import os import json import sqlite3 import threading import queue import time import logging from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Generator def _normalize_coordinates(coords): """将坐标统一为 [[x,y],...] 格式,兼容 [{'x':..,'y':..},...] 格式""" if isinstance(coords, str): try: coords = eval(coords) except: return coords if isinstance(coords, list) and coords and isinstance(coords[0], dict): return [[p.get("x", 0), p.get("y", 0)] for p in coords] return coords from dataclasses import dataclass, field from pathlib import Path logger = logging.getLogger(__name__) @dataclass class StorageConfig: """存储配置类""" db_path: str = "./data/security_events.db" image_dir: str = "./data/captures" retention_days: int = 7 wal_mode: bool = True batch_size: int = 100 flush_interval: float = 5.0 @dataclass class AlertRecord: """告警记录""" alert_id: str camera_id: str roi_id: str bind_id: Optional[str] = None # 关联 roi_algo_bind 表 alert_type: str = "detection" target_class: Optional[str] = None confidence: Optional[float] = None bbox: Optional[List[float]] = None message: Optional[str] = None image_path: Optional[str] = None status: str = "pending" created_at: datetime = field(default_factory=datetime.now) processed_at: Optional[datetime] = None duration_minutes: Optional[float] = None detections: Optional[str] = None # JSON格式的检测结果 class SQLiteManager: """SQLite 数据库管理器""" _instance = None _lock = threading.Lock() def __new__(cls, config: Optional[StorageConfig] = None): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, config: Optional[StorageConfig] = None): if self._initialized: return if config is None: config = StorageConfig() self.config = config self._conn: Optional[sqlite3.Connection] = None self._write_queue: queue.Queue = queue.Queue() self._running = False self._write_thread: Optional[threading.Thread] = None self._cleanup_thread: Optional[threading.Thread] = None self._init_directories() self._init_database() self._start_background_threads() self._initialized = True logger.info(f"SQLite 数据库初始化成功: {config.db_path}") def _init_directories(self): """初始化目录""" Path(self.config.db_path).parent.mkdir(parents=True, exist_ok=True) Path(self.config.image_dir).mkdir(parents=True, exist_ok=True) def _init_database(self): """初始化数据库表""" self._conn = sqlite3.connect( self.config.db_path, check_same_thread=False, timeout=30.0 ) if self.config.wal_mode: cursor = self._conn.cursor() cursor.execute("PRAGMA journal_mode=WAL;") cursor.execute("PRAGMA synchronous=NORMAL;") cursor.execute("PRAGMA cache_size=-64000;") self._conn.commit() cursor = self._conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS algorithm_registry ( algo_code TEXT PRIMARY KEY, algo_name TEXT NOT NULL, target_class TEXT DEFAULT 'person', param_schema TEXT, description TEXT, is_active BOOLEAN DEFAULT 1, created_at TEXT, updated_at TEXT ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_algo_active ON algorithm_registry(is_active) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS roi_configs ( roi_id TEXT PRIMARY KEY, camera_id TEXT NOT NULL, roi_type TEXT NOT NULL, coordinates TEXT NOT NULL, enabled BOOLEAN DEFAULT 1, priority INTEGER DEFAULT 0, extra_params TEXT, updated_at TEXT ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_roi_camera ON roi_configs(camera_id) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS roi_algo_bind ( bind_id TEXT PRIMARY KEY, roi_id TEXT NOT NULL, algo_code TEXT NOT NULL, params TEXT NOT NULL, priority INTEGER DEFAULT 0, enabled BOOLEAN DEFAULT 1, created_at TEXT, updated_at TEXT, FOREIGN KEY (roi_id) REFERENCES roi_configs(roi_id) ON DELETE CASCADE, FOREIGN KEY (algo_code) REFERENCES algorithm_registry(algo_code) ON DELETE RESTRICT ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_bind_roi ON roi_algo_bind(roi_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_bind_algo ON roi_algo_bind(algo_code) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS alert_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, alert_id TEXT UNIQUE NOT NULL, camera_id TEXT NOT NULL, roi_id TEXT NOT NULL, bind_id TEXT, alert_type TEXT NOT NULL, target_class TEXT, confidence REAL, bbox TEXT, message TEXT, image_path TEXT, status TEXT DEFAULT 'pending', created_at TEXT NOT NULL, processed_at TEXT, duration_minutes REAL, detections TEXT, FOREIGN KEY (bind_id) REFERENCES roi_algo_bind(bind_id) ON DELETE SET NULL ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_alert_camera ON alert_records(camera_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_alert_created ON alert_records(created_at) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_alert_status ON alert_records(status) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_alert_bind ON alert_records(bind_id) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS camera_configs ( camera_id TEXT PRIMARY KEY, rtsp_url TEXT NOT NULL, camera_name TEXT, status BOOLEAN DEFAULT 1, enabled BOOLEAN DEFAULT 1, location TEXT, roi_group_id TEXT, extra_params TEXT, updated_at TEXT ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS config_update_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, config_type TEXT NOT NULL, config_id TEXT, old_value TEXT, new_value TEXT, updated_by TEXT, updated_at TEXT ) """) self._conn.commit() self._init_default_algorithms() def _init_default_algorithms(self): """初始化默认算法配置""" try: cursor = self._conn.cursor() algorithms = [ { 'algo_code': 'leave_post', 'algo_name': '离岗检测', 'target_class': 'person', 'param_schema': json.dumps({ "confirm_on_duty_sec": {"type": "int", "default": 10, "min": 1}, "confirm_leave_sec": {"type": "int", "default": 30, "min": 1}, "cooldown_sec": {"type": "int", "default": 600, "min": 0}, "working_hours": {"type": "list", "default": []}, }), 'description': '检测人员是否在岗,支持工作时间段配置' }, { 'algo_code': 'intrusion', 'algo_name': '周界入侵检测', 'target_class': 'person', 'param_schema': json.dumps({ "cooldown_seconds": {"type": "int", "default": 300, "min": 0}, "confirm_seconds": {"type": "int", "default": 5, "min": 1}, }), 'description': '检测人员进入指定区域,支持确认时间和冷却时间配置' }, { 'algo_code': 'crowd_detection', 'algo_name': '人群聚集检测', 'target_class': 'person', 'param_schema': json.dumps({ "max_count": {"type": "int", "default": 10, "min": 1}, "cooldown_seconds": {"type": "int", "default": 300, "min": 0}, }), 'description': '检测区域内人员数量是否超过阈值' }, ] for algo in algorithms: cursor.execute(""" INSERT OR IGNORE INTO algorithm_registry ( algo_code, algo_name, target_class, param_schema, description, is_active, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, 1, ?, ?) """, ( algo['algo_code'], algo['algo_name'], algo['target_class'], algo['param_schema'], algo['description'], datetime.now().isoformat(), datetime.now().isoformat() )) self._conn.commit() logger.info(f"已初始化 {len(algorithms)} 个默认算法配置") except Exception as e: logger.error(f"初始化默认算法失败: {e}") def _start_background_threads(self): """启动后台线程""" self._running = True self._write_thread = threading.Thread( target=self._write_worker, name="SQLiteWrite", daemon=True ) self._write_thread.start() self._cleanup_thread = threading.Thread( target=self._cleanup_worker, name="SQLiteCleanup", daemon=True ) self._cleanup_thread.start() def _write_worker(self): """异步写入工作线程""" batch = [] last_flush = time.time() while self._running: try: try: item = self._write_queue.get(timeout=1.0) batch.append(item) except queue.Empty: pass should_flush = ( len(batch) >= self.config.batch_size or time.time() - last_flush >= self.config.flush_interval ) if batch and (should_flush or len(batch) >= 1000): self._flush_batch(batch) batch.clear() last_flush = time.time() except Exception as e: logger.error(f"SQLite 写入异常: {e}") if batch: self._flush_batch(batch) def _flush_batch(self, batch: List[Dict[str, Any]]): """批量写入数据库""" try: cursor = self._conn.cursor() for record in batch: cursor.execute(""" INSERT OR REPLACE INTO alert_records ( alert_id, camera_id, roi_id, bind_id, alert_type, target_class, confidence, bbox, message, image_path, status, created_at, processed_at, duration_minutes, detections ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( record['alert_id'], record['camera_id'], record['roi_id'], record.get('bind_id'), record['alert_type'], record.get('target_class'), record.get('confidence'), record.get('bbox'), record.get('message'), record.get('image_path'), record.get('status', 'pending'), record['created_at'], record.get('processed_at'), record.get('duration_minutes'), record.get('detections'), )) self._conn.commit() logger.debug(f"批量写入 {len(batch)} 条记录") except Exception as e: logger.error(f"批量写入失败: {e}") def _cleanup_worker(self): """清理工作线程(每天执行一次)""" while self._running: try: time.sleep(3600) if self._running: self.cleanup_old_data() except Exception as e: logger.error(f"数据清理异常: {e}") def queue_alert(self, alert: AlertRecord): """将告警加入写入队列""" record = { 'alert_id': alert.alert_id, 'camera_id': alert.camera_id, 'roi_id': alert.roi_id, 'bind_id': alert.bind_id, 'alert_type': alert.alert_type, 'target_class': alert.target_class, 'confidence': alert.confidence, 'bbox': str(alert.bbox) if alert.bbox else None, 'message': alert.message, 'image_path': alert.image_path, 'status': alert.status, 'created_at': alert.created_at.isoformat(), 'processed_at': alert.processed_at.isoformat() if alert.processed_at else None, 'duration_minutes': alert.duration_minutes, 'detections': alert.detections, } self._write_queue.put(record) def get_alerts( self, camera_id: Optional[str] = None, status: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """查询告警记录""" cursor = self._conn.cursor() query = "SELECT * FROM alert_records WHERE 1=1" params = [] if camera_id: query += " AND camera_id = ?" params.append(camera_id) if status: query += " AND status = ?" params.append(status) if start_time: query += " AND created_at >= ?" params.append(start_time.isoformat()) if end_time: query += " AND created_at <= ?" params.append(end_time.isoformat()) query += " ORDER BY created_at DESC LIMIT ?" params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() columns = ['id', 'alert_id', 'camera_id', 'roi_id', 'alert_type', 'target_class', 'confidence', 'bbox', 'message', 'image_path', 'status', 'created_at', 'processed_at'] return [dict(zip(columns, row)) for row in rows] def update_status(self, alert_id: str, status: str) -> bool: """更新告警状态""" try: cursor = self._conn.cursor() cursor.execute(""" UPDATE alert_records SET status = ?, processed_at = ? WHERE alert_id = ? """, (status, datetime.now().isoformat(), alert_id)) self._conn.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"更新状态失败: {e}") return False def cleanup_old_data(self): """清理过期数据""" try: cutoff = (datetime.now() - timedelta(days=self.config.retention_days)).isoformat() cursor = self._conn.cursor() cursor.execute("SELECT image_path FROM alert_records WHERE created_at < ?", (cutoff,)) images = cursor.fetchall() for (img_path,) in images: if img_path and os.path.exists(img_path): try: os.remove(img_path) except Exception: pass cursor.execute("DELETE FROM alert_records WHERE created_at < ?", (cutoff,)) deleted = cursor.rowcount self._conn.commit() logger.info(f"清理完成: 删除 {deleted} 条过期记录") return deleted except Exception as e: logger.error(f"数据清理失败: {e}") return 0 def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" cursor = self._conn.cursor() cursor.execute("SELECT COUNT(*) FROM alert_records") total = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM alert_records WHERE status = 'pending'") pending = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM alert_records WHERE created_at > ?", ((datetime.now() - timedelta(hours=24)).isoformat(),)) today = cursor.fetchone()[0] db_size = os.path.getsize(self.config.db_path) / (1024 * 1024) return { "total_alerts": total, "pending_alerts": pending, "today_alerts": today, "db_size_mb": round(db_size, 2), "queue_size": self._write_queue.qsize(), "retention_days": self.config.retention_days, } def close(self): """关闭数据库""" self._running = False if self._write_thread and self._write_thread.is_alive(): self._write_thread.join(timeout=10) if self._conn: self._conn.close() logger.info("SQLite 数据库已关闭") def save_camera_config(self, camera_id: str, rtsp_url: str, **kwargs) -> bool: """保存摄像头配置""" try: cursor = self._conn.cursor() now = datetime.now().isoformat() cursor.execute(""" INSERT OR REPLACE INTO camera_configs ( camera_id, rtsp_url, camera_name, status, enabled, location, roi_group_id, extra_params, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( camera_id, rtsp_url, kwargs.get('camera_name'), kwargs.get('status', True), kwargs.get('enabled', True), kwargs.get('location'), kwargs.get('roi_group_id'), str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None, now )) self._conn.commit() return True except Exception as e: logger.error(f"保存摄像头配置失败: {e}") return False def get_camera_config(self, camera_id: str) -> Optional[Dict[str, Any]]: """获取摄像头配置""" try: cursor = self._conn.cursor() cursor.execute("SELECT * FROM camera_configs WHERE camera_id = ?", (camera_id,)) row = cursor.fetchone() if row: columns = ['camera_id', 'rtsp_url', 'camera_name', 'status', 'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at'] return dict(zip(columns, row)) return None except Exception as e: logger.error(f"获取摄像头配置失败: {e}") return None def get_all_camera_configs(self) -> List[Dict[str, Any]]: """获取所有摄像头配置""" try: cursor = self._conn.cursor() cursor.execute("SELECT * FROM camera_configs ORDER BY camera_id") columns = ['camera_id', 'rtsp_url', 'camera_name', 'status', 'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at'] return [dict(zip(columns, row)) for row in cursor.fetchall()] except Exception as e: logger.error(f"获取所有摄像头配置失败: {e}") return [] def delete_camera_config(self, camera_id: str) -> bool: """删除摄像头配置""" try: cursor = self._conn.cursor() cursor.execute("DELETE FROM camera_configs WHERE camera_id = ?", (camera_id,)) self._conn.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"删除摄像头配置失败: {e}") return False def save_roi_config(self, roi_id: str, camera_id: str, roi_type: str, coordinates: List, **kwargs) -> bool: """保存ROI配置(空间信息,不包含业务参数)""" try: cursor = self._conn.cursor() now = datetime.now().isoformat() cursor.execute(""" INSERT OR REPLACE INTO roi_configs ( roi_id, camera_id, roi_type, coordinates, enabled, priority, extra_params, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( roi_id, camera_id, roi_type, str(coordinates), kwargs.get('enabled', True), kwargs.get('priority', 0), str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None, now )) self._conn.commit() return True except Exception as e: logger.error(f"保存ROI配置失败: {e}") return False def get_roi_config(self, roi_id: str) -> Optional[Dict[str, Any]]: """获取ROI配置""" try: cursor = self._conn.cursor() cursor.execute("SELECT * FROM roi_configs WHERE roi_id = ?", (roi_id,)) row = cursor.fetchone() if row: columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates', 'enabled', 'priority', 'extra_params', 'updated_at'] result = dict(zip(columns, row)) try: result['coordinates'] = _normalize_coordinates(result['coordinates']) except: pass return result return None except Exception as e: logger.error(f"获取ROI配置失败: {e}") return None def get_rois_by_camera(self, camera_id: str) -> List[Dict[str, Any]]: """获取指定摄像头的所有ROI配置""" try: cursor = self._conn.cursor() cursor.execute("SELECT * FROM roi_configs WHERE camera_id = ?", (camera_id,)) columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates', 'enabled', 'priority', 'extra_params', 'updated_at'] results = [] for row in cursor.fetchall(): r = dict(zip(columns, row)) try: r['coordinates'] = _normalize_coordinates(r['coordinates']) except: pass results.append(r) return results except Exception as e: logger.error(f"获取ROI配置失败: {e}") return [] def get_all_roi_configs(self) -> List[Dict[str, Any]]: """获取所有ROI配置""" try: cursor = self._conn.cursor() cursor.execute("SELECT * FROM roi_configs ORDER BY camera_id, roi_id") columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates', 'enabled', 'priority', 'extra_params', 'updated_at'] results = [] for row in cursor.fetchall(): r = dict(zip(columns, row)) try: r['coordinates'] = _normalize_coordinates(r['coordinates']) except: pass results.append(r) return results except Exception as e: logger.error(f"获取所有ROI配置失败: {e}") return [] def delete_roi_config(self, roi_id: str) -> bool: """删除ROI配置""" try: cursor = self._conn.cursor() cursor.execute("DELETE FROM roi_configs WHERE roi_id = ?", (roi_id,)) self._conn.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"删除ROI配置失败: {e}") return False def save_algorithm(self, algo_code: str, algo_name: str, target_class: str = "person", param_schema: Optional[str] = None, description: Optional[str] = None, is_active: bool = True) -> bool: """保存算法配置""" try: cursor = self._conn.cursor() now = datetime.now().isoformat() cursor.execute(""" INSERT OR REPLACE INTO algorithm_registry ( algo_code, algo_name, target_class, param_schema, description, is_active, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( algo_code, algo_name, target_class, param_schema, description, is_active, now, now )) self._conn.commit() return True except Exception as e: logger.error(f"保存算法配置失败: {e}") return False def get_algorithm(self, algo_code: str) -> Optional[Dict[str, Any]]: """获取算法配置""" try: cursor = self._conn.cursor() cursor.execute("SELECT * FROM algorithm_registry WHERE algo_code = ?", (algo_code,)) row = cursor.fetchone() if row: columns = ['algo_code', 'algo_name', 'target_class', 'param_schema', 'description', 'is_active', 'created_at', 'updated_at'] result = dict(zip(columns, row)) try: if result.get('param_schema'): result['param_schema'] = json.loads(result['param_schema']) except: pass return result return None except Exception as e: logger.error(f"获取算法配置失败: {e}") return None def get_all_algorithms(self, active_only: bool = True) -> List[Dict[str, Any]]: """获取所有算法配置""" try: cursor = self._conn.cursor() query = "SELECT * FROM algorithm_registry" if active_only: query += " WHERE is_active = 1" cursor.execute(query) columns = ['algo_code', 'algo_name', 'target_class', 'param_schema', 'description', 'is_active', 'created_at', 'updated_at'] results = [] for row in cursor.fetchall(): r = dict(zip(columns, row)) try: if r.get('param_schema'): r['param_schema'] = json.loads(r['param_schema']) except: pass results.append(r) return results except Exception as e: logger.error(f"获取所有算法配置失败: {e}") return [] def save_roi_algo_bind(self, bind_id: str, roi_id: str, algo_code: str, params: Dict[str, Any], priority: int = 0, enabled: bool = True) -> bool: """保存ROI与算法的绑定关系""" try: cursor = self._conn.cursor() now = datetime.now().isoformat() cursor.execute(""" INSERT OR REPLACE INTO roi_algo_bind ( bind_id, roi_id, algo_code, params, priority, enabled, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( bind_id, roi_id, algo_code, json.dumps(params), priority, enabled, now, now )) self._conn.commit() return True except Exception as e: logger.error(f"保存ROI算法绑定失败: {e}") return False def get_roi_algo_bind(self, bind_id: str) -> Optional[Dict[str, Any]]: """获取ROI算法绑定配置""" try: cursor = self._conn.cursor() cursor.execute("SELECT * FROM roi_algo_bind WHERE bind_id = ?", (bind_id,)) row = cursor.fetchone() if row: columns = ['bind_id', 'roi_id', 'algo_code', 'params', 'priority', 'enabled', 'created_at', 'updated_at'] result = dict(zip(columns, row)) try: if result.get('params'): result['params'] = json.loads(result['params']) except: pass return result return None except Exception as e: logger.error(f"获取ROI算法绑定失败: {e}") return None def get_bindings_by_roi(self, roi_id: str) -> List[Dict[str, Any]]: """获取指定ROI的所有算法绑定""" try: cursor = self._conn.cursor() cursor.execute(""" SELECT b.*, a.algo_name, a.target_class FROM roi_algo_bind b LEFT JOIN algorithm_registry a ON b.algo_code = a.algo_code WHERE b.roi_id = ? AND b.enabled = 1 ORDER BY b.priority DESC """, (roi_id,)) results = [] for row in cursor.fetchall(): result = dict(zip( ['bind_id', 'roi_id', 'algo_code', 'params', 'priority', 'enabled', 'created_at', 'updated_at', 'algo_name', 'target_class'], row )) try: if result.get('params'): result['params'] = json.loads(result['params']) except: pass results.append(result) return results except Exception as e: logger.error(f"获取ROI算法绑定失败: {e}") return [] def get_bindings_by_camera(self, camera_id: str) -> List[Dict[str, Any]]: """获取指定摄像头的所有ROI算法绑定""" try: cursor = self._conn.cursor() cursor.execute(""" SELECT b.*, a.algo_name, a.target_class, r.roi_type, r.coordinates FROM roi_algo_bind b LEFT JOIN algorithm_registry a ON b.algo_code = a.algo_code LEFT JOIN roi_configs r ON b.roi_id = r.roi_id WHERE r.camera_id = ? AND b.enabled = 1 AND r.enabled = 1 ORDER BY r.priority DESC, b.priority DESC """, (camera_id,)) results = [] for row in cursor.fetchall(): result = dict(zip( ['bind_id', 'roi_id', 'algo_code', 'params', 'priority', 'enabled', 'created_at', 'updated_at', 'algo_name', 'target_class', 'roi_type', 'coordinates'], row )) try: if result.get('params'): result['params'] = json.loads(result['params']) if result.get('coordinates'): result['coordinates'] = _normalize_coordinates(result['coordinates']) except: pass results.append(result) return results except Exception as e: logger.error(f"获取摄像头算法绑定失败: {e}") return [] def delete_roi_algo_bind(self, bind_id: str) -> bool: """删除ROI算法绑定""" try: cursor = self._conn.cursor() cursor.execute("DELETE FROM roi_algo_bind WHERE bind_id = ?", (bind_id,)) self._conn.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"删除ROI算法绑定失败: {e}") return False def delete_bindings_by_roi(self, roi_id: str) -> int: """删除指定ROI的所有算法绑定""" try: cursor = self._conn.cursor() cursor.execute("DELETE FROM roi_algo_bind WHERE roi_id = ?", (roi_id,)) self._conn.commit() return cursor.rowcount except Exception as e: logger.error(f"删除ROI算法绑定失败: {e}") return 0 def log_config_update( self, config_type: str, config_id: Optional[str], old_value: Any, new_value: Any, updated_by: str = "system" ): """记录配置更新日志""" try: cursor = self._conn.cursor() now = datetime.now().isoformat() cursor.execute(""" INSERT INTO config_update_log ( config_type, config_id, old_value, new_value, updated_by, updated_at ) VALUES (?, ?, ?, ?, ?, ?) """, ( config_type, config_id, str(old_value) if old_value else None, str(new_value) if new_value else None, updated_by, now )) self._conn.commit() logger.info(f"配置更新日志已记录: {config_type}/{config_id}") except Exception as e: logger.error(f"记录配置更新日志失败: {e}") def get_config_update_log( self, config_type: Optional[str] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """获取配置更新日志""" try: cursor = self._conn.cursor() query = "SELECT * FROM config_update_log WHERE 1=1" params = [] if config_type: query += " AND config_type = ?" params.append(config_type) query += " ORDER BY id DESC LIMIT ?" params.append(limit) cursor.execute(query, params) columns = ['id', 'config_type', 'config_id', 'old_value', 'new_value', 'updated_by', 'updated_at'] return [dict(zip(columns, row)) for row in cursor.fetchall()] except Exception as e: logger.error(f"获取配置更新日志失败: {e}") return [] def get_sqlite_manager() -> SQLiteManager: """获取 SQLite 管理器单例""" return SQLiteManager()