""" SQLite 数据库模块 边缘AI推理服务的本地数据存储 特性: - WAL 模式(Write-Ahead Logging)提升写入性能 - 异步写入策略 - 滚动清理机制(保留7天数据) """ import os import sqlite3 import threading import queue import time import logging from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Generator from dataclasses import dataclass, field from pathlib import Path logger = logging.getLogger(__name__) @dataclass class StorageConfig: """存储配置类""" db_path: str = "./data/security_events.db" image_dir: str = "./data/captures" retention_days: int = 7 wal_mode: bool = True batch_size: int = 100 flush_interval: float = 5.0 @dataclass class AlertRecord: """告警记录""" alert_id: str camera_id: str roi_id: str alert_type: str target_class: Optional[str] = None confidence: Optional[float] = None bbox: Optional[List[float]] = None message: Optional[str] = None image_path: Optional[str] = None status: str = "pending" created_at: datetime = field(default_factory=datetime.now) processed_at: Optional[datetime] = None class SQLiteManager: """SQLite 数据库管理器""" _instance = None _lock = threading.Lock() def __new__(cls, config: Optional[StorageConfig] = None): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, config: Optional[StorageConfig] = None): if self._initialized: return if config is None: config = StorageConfig() self.config = config self._conn: Optional[sqlite3.Connection] = None self._write_queue: queue.Queue = queue.Queue() self._running = False self._write_thread: Optional[threading.Thread] = None self._cleanup_thread: Optional[threading.Thread] = None self._init_directories() self._init_database() self._start_background_threads() self._initialized = True logger.info(f"SQLite 数据库初始化成功: {config.db_path}") def _init_directories(self): """初始化目录""" Path(self.config.db_path).parent.mkdir(parents=True, exist_ok=True) Path(self.config.image_dir).mkdir(parents=True, exist_ok=True) def _init_database(self): """初始化数据库表""" self._conn = sqlite3.connect( self.config.db_path, check_same_thread=False, timeout=30.0 ) if self.config.wal_mode: cursor = self._conn.cursor() cursor.execute("PRAGMA journal_mode=WAL;") cursor.execute("PRAGMA synchronous=NORMAL;") cursor.execute("PRAGMA cache_size=-64000;") self._conn.commit() cursor = self._conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS alert_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, alert_id TEXT UNIQUE NOT NULL, camera_id TEXT NOT NULL, roi_id TEXT NOT NULL, alert_type TEXT NOT NULL, target_class TEXT, confidence REAL, bbox TEXT, message TEXT, image_path TEXT, status TEXT DEFAULT 'pending', created_at TEXT NOT NULL, processed_at TEXT ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_alert_camera ON alert_records(camera_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_alert_created ON alert_records(created_at) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_alert_status ON alert_records(status) """) self._conn.commit() def _start_background_threads(self): """启动后台线程""" self._running = True self._write_thread = threading.Thread( target=self._write_worker, name="SQLiteWrite", daemon=True ) self._write_thread.start() self._cleanup_thread = threading.Thread( target=self._cleanup_worker, name="SQLiteCleanup", daemon=True ) self._cleanup_thread.start() def _write_worker(self): """异步写入工作线程""" batch = [] last_flush = time.time() while self._running: try: try: item = self._write_queue.get(timeout=1.0) batch.append(item) except queue.Empty: pass should_flush = ( len(batch) >= self.config.batch_size or time.time() - last_flush >= self.config.flush_interval ) if batch and (should_flush or len(batch) >= 1000): self._flush_batch(batch) batch.clear() last_flush = time.time() except Exception as e: logger.error(f"SQLite 写入异常: {e}") if batch: self._flush_batch(batch) def _flush_batch(self, batch: List[Dict[str, Any]]): """批量写入数据库""" try: cursor = self._conn.cursor() for record in batch: cursor.execute(""" INSERT OR REPLACE INTO alert_records ( alert_id, camera_id, roi_id, alert_type, target_class, confidence, bbox, message, image_path, status, created_at, processed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( record['alert_id'], record['camera_id'], record['roi_id'], record['alert_type'], record.get('target_class'), record.get('confidence'), record.get('bbox'), record.get('message'), record.get('image_path'), record.get('status', 'pending'), record['created_at'], record.get('processed_at'), )) self._conn.commit() logger.debug(f"批量写入 {len(batch)} 条记录") except Exception as e: logger.error(f"批量写入失败: {e}") def _cleanup_worker(self): """清理工作线程(每天执行一次)""" while self._running: try: time.sleep(3600) if self._running: self.cleanup_old_data() except Exception as e: logger.error(f"数据清理异常: {e}") def queue_alert(self, alert: AlertRecord): """将告警加入写入队列""" record = { 'alert_id': alert.alert_id, 'camera_id': alert.camera_id, 'roi_id': alert.roi_id, 'alert_type': alert.alert_type, 'target_class': alert.target_class, 'confidence': alert.confidence, 'bbox': str(alert.bbox) if alert.bbox else None, 'message': alert.message, 'image_path': alert.image_path, 'status': alert.status, 'created_at': alert.created_at.isoformat(), 'processed_at': alert.processed_at.isoformat() if alert.processed_at else None, } self._write_queue.put(record) def get_alerts( self, camera_id: Optional[str] = None, status: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """查询告警记录""" cursor = self._conn.cursor() query = "SELECT * FROM alert_records WHERE 1=1" params = [] if camera_id: query += " AND camera_id = ?" params.append(camera_id) if status: query += " AND status = ?" params.append(status) if start_time: query += " AND created_at >= ?" params.append(start_time.isoformat()) if end_time: query += " AND created_at <= ?" params.append(end_time.isoformat()) query += " ORDER BY created_at DESC LIMIT ?" params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() columns = ['id', 'alert_id', 'camera_id', 'roi_id', 'alert_type', 'target_class', 'confidence', 'bbox', 'message', 'image_path', 'status', 'created_at', 'processed_at'] return [dict(zip(columns, row)) for row in rows] def update_status(self, alert_id: str, status: str) -> bool: """更新告警状态""" try: cursor = self._conn.cursor() cursor.execute(""" UPDATE alert_records SET status = ?, processed_at = ? WHERE alert_id = ? """, (status, datetime.now().isoformat(), alert_id)) self._conn.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"更新状态失败: {e}") return False def cleanup_old_data(self): """清理过期数据""" try: cutoff = (datetime.now() - timedelta(days=self.config.retention_days)).isoformat() cursor = self._conn.cursor() cursor.execute("SELECT image_path FROM alert_records WHERE created_at < ?", (cutoff,)) images = cursor.fetchall() for (img_path,) in images: if img_path and os.path.exists(img_path): try: os.remove(img_path) except Exception: pass cursor.execute("DELETE FROM alert_records WHERE created_at < ?", (cutoff,)) deleted = cursor.rowcount self._conn.commit() logger.info(f"清理完成: 删除 {deleted} 条过期记录") return deleted except Exception as e: logger.error(f"数据清理失败: {e}") return 0 def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" cursor = self._conn.cursor() cursor.execute("SELECT COUNT(*) FROM alert_records") total = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM alert_records WHERE status = 'pending'") pending = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM alert_records WHERE created_at > ?", ((datetime.now() - timedelta(hours=24)).isoformat(),)) today = cursor.fetchone()[0] db_size = os.path.getsize(self.config.db_path) / (1024 * 1024) return { "total_alerts": total, "pending_alerts": pending, "today_alerts": today, "db_size_mb": round(db_size, 2), "queue_size": self._write_queue.qsize(), "retention_days": self.config.retention_days, } def close(self): """关闭数据库""" self._running = False if self._write_thread and self._write_thread.is_alive(): self._write_thread.join(timeout=10) if self._conn: self._conn.close() logger.info("SQLite 数据库已关闭") def get_sqlite_manager() -> SQLiteManager: """获取 SQLite 管理器单例""" return SQLiteManager()