Files
security-ai-edge/config/database.py

566 lines
20 KiB
Python
Raw Normal View History

2026-01-29 18:33:12 +08:00
"""
SQLite 数据库模块
边缘AI推理服务的本地数据存储
特性
- WAL 模式Write-Ahead Logging提升写入性能
- 异步写入策略
- 滚动清理机制保留7天数据
2026-01-29 18:33:12 +08:00
"""
import os
import sqlite3
import threading
import queue
import time
2026-01-29 18:33:12 +08:00
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Generator
from dataclasses import dataclass, field
from pathlib import Path
2026-01-29 18:33:12 +08:00
logger = logging.getLogger(__name__)
@dataclass
class StorageConfig:
"""存储配置类"""
db_path: str = "./data/security_events.db"
image_dir: str = "./data/captures"
retention_days: int = 7
wal_mode: bool = True
batch_size: int = 100
flush_interval: float = 5.0
2026-01-29 18:33:12 +08:00
@dataclass
class AlertRecord:
"""告警记录"""
alert_id: str
camera_id: str
roi_id: str
alert_type: str
target_class: Optional[str] = None
confidence: Optional[float] = None
bbox: Optional[List[float]] = None
message: Optional[str] = None
image_path: Optional[str] = None
status: str = "pending"
created_at: datetime = field(default_factory=datetime.now)
processed_at: Optional[datetime] = None
2026-01-29 18:33:12 +08:00
class SQLiteManager:
"""SQLite 数据库管理器"""
2026-01-29 18:33:12 +08:00
_instance = None
_lock = threading.Lock()
2026-01-29 18:33:12 +08:00
def __new__(cls, config: Optional[StorageConfig] = None):
2026-01-29 18:33:12 +08:00
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
2026-01-29 18:33:12 +08:00
return cls._instance
def __init__(self, config: Optional[StorageConfig] = None):
if self._initialized:
2026-01-29 18:33:12 +08:00
return
if config is None:
config = StorageConfig()
self.config = config
self._conn: Optional[sqlite3.Connection] = None
self._write_queue: queue.Queue = queue.Queue()
self._running = False
self._write_thread: Optional[threading.Thread] = None
self._cleanup_thread: Optional[threading.Thread] = None
self._init_directories()
self._init_database()
self._start_background_threads()
2026-01-29 18:33:12 +08:00
self._initialized = True
logger.info(f"SQLite 数据库初始化成功: {config.db_path}")
2026-01-29 18:33:12 +08:00
def _init_directories(self):
"""初始化目录"""
Path(self.config.db_path).parent.mkdir(parents=True, exist_ok=True)
Path(self.config.image_dir).mkdir(parents=True, exist_ok=True)
def _init_database(self):
"""初始化数据库表"""
self._conn = sqlite3.connect(
self.config.db_path,
check_same_thread=False,
timeout=30.0
)
if self.config.wal_mode:
cursor = self._conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA cache_size=-64000;")
self._conn.commit()
cursor = self._conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS alert_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
alert_id TEXT UNIQUE NOT NULL,
camera_id TEXT NOT NULL,
roi_id TEXT NOT NULL,
alert_type TEXT NOT NULL,
target_class TEXT,
confidence REAL,
bbox TEXT,
message TEXT,
image_path TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
processed_at TEXT
2026-01-29 18:33:12 +08:00
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_camera
ON alert_records(camera_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_created
ON alert_records(created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_status
ON alert_records(status)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS camera_configs (
camera_id TEXT PRIMARY KEY,
rtsp_url TEXT NOT NULL,
camera_name TEXT,
status BOOLEAN DEFAULT 1,
enabled BOOLEAN DEFAULT 1,
location TEXT,
extra_params TEXT,
updated_at TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS roi_configs (
roi_id TEXT PRIMARY KEY,
camera_id TEXT NOT NULL,
roi_type TEXT NOT NULL,
coordinates TEXT NOT NULL,
algorithm_type TEXT NOT NULL,
alert_threshold INTEGER DEFAULT 3,
alert_cooldown INTEGER DEFAULT 300,
enabled BOOLEAN DEFAULT 1,
extra_params TEXT,
updated_at TEXT
)
""")
self._conn.commit()
def _start_background_threads(self):
"""启动后台线程"""
self._running = True
self._write_thread = threading.Thread(
target=self._write_worker,
name="SQLiteWrite",
daemon=True
)
self._write_thread.start()
self._cleanup_thread = threading.Thread(
target=self._cleanup_worker,
name="SQLiteCleanup",
daemon=True
)
self._cleanup_thread.start()
def _write_worker(self):
"""异步写入工作线程"""
batch = []
last_flush = time.time()
while self._running:
try:
try:
item = self._write_queue.get(timeout=1.0)
batch.append(item)
except queue.Empty:
pass
should_flush = (
len(batch) >= self.config.batch_size or
time.time() - last_flush >= self.config.flush_interval
)
if batch and (should_flush or len(batch) >= 1000):
self._flush_batch(batch)
batch.clear()
last_flush = time.time()
except Exception as e:
logger.error(f"SQLite 写入异常: {e}")
if batch:
self._flush_batch(batch)
def _flush_batch(self, batch: List[Dict[str, Any]]):
"""批量写入数据库"""
try:
cursor = self._conn.cursor()
2026-01-29 18:33:12 +08:00
for record in batch:
cursor.execute("""
INSERT OR REPLACE INTO alert_records (
alert_id, camera_id, roi_id, alert_type,
target_class, confidence, bbox, message,
image_path, status, created_at, processed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record['alert_id'],
record['camera_id'],
record['roi_id'],
record['alert_type'],
record.get('target_class'),
record.get('confidence'),
record.get('bbox'),
record.get('message'),
record.get('image_path'),
record.get('status', 'pending'),
record['created_at'],
record.get('processed_at'),
))
2026-01-29 18:33:12 +08:00
self._conn.commit()
logger.debug(f"批量写入 {len(batch)} 条记录")
2026-01-29 18:33:12 +08:00
except Exception as e:
logger.error(f"批量写入失败: {e}")
2026-01-29 18:33:12 +08:00
def _cleanup_worker(self):
"""清理工作线程(每天执行一次)"""
while self._running:
try:
time.sleep(3600)
if self._running:
self.cleanup_old_data()
except Exception as e:
logger.error(f"数据清理异常: {e}")
2026-01-29 18:33:12 +08:00
def queue_alert(self, alert: AlertRecord):
"""将告警加入写入队列"""
record = {
'alert_id': alert.alert_id,
'camera_id': alert.camera_id,
'roi_id': alert.roi_id,
'alert_type': alert.alert_type,
'target_class': alert.target_class,
'confidence': alert.confidence,
'bbox': str(alert.bbox) if alert.bbox else None,
'message': alert.message,
'image_path': alert.image_path,
'status': alert.status,
'created_at': alert.created_at.isoformat(),
'processed_at': alert.processed_at.isoformat() if alert.processed_at else None,
}
self._write_queue.put(record)
def get_alerts(
self,
camera_id: Optional[str] = None,
status: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""查询告警记录"""
cursor = self._conn.cursor()
query = "SELECT * FROM alert_records WHERE 1=1"
params = []
if camera_id:
query += " AND camera_id = ?"
params.append(camera_id)
if status:
query += " AND status = ?"
params.append(status)
if start_time:
query += " AND created_at >= ?"
params.append(start_time.isoformat())
if end_time:
query += " AND created_at <= ?"
params.append(end_time.isoformat())
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
columns = ['id', 'alert_id', 'camera_id', 'roi_id', 'alert_type',
'target_class', 'confidence', 'bbox', 'message', 'image_path',
'status', 'created_at', 'processed_at']
2026-01-29 18:33:12 +08:00
return [dict(zip(columns, row)) for row in rows]
def update_status(self, alert_id: str, status: str) -> bool:
"""更新告警状态"""
2026-01-29 18:33:12 +08:00
try:
cursor = self._conn.cursor()
cursor.execute("""
UPDATE alert_records
SET status = ?, processed_at = ?
WHERE alert_id = ?
""", (status, datetime.now().isoformat(), alert_id))
self._conn.commit()
return cursor.rowcount > 0
2026-01-29 18:33:12 +08:00
except Exception as e:
logger.error(f"更新状态失败: {e}")
return False
2026-01-29 18:33:12 +08:00
def cleanup_old_data(self):
"""清理过期数据"""
try:
cutoff = (datetime.now() - timedelta(days=self.config.retention_days)).isoformat()
2026-01-29 18:33:12 +08:00
cursor = self._conn.cursor()
2026-01-29 18:33:12 +08:00
cursor.execute("SELECT image_path FROM alert_records WHERE created_at < ?", (cutoff,))
images = cursor.fetchall()
for (img_path,) in images:
if img_path and os.path.exists(img_path):
try:
os.remove(img_path)
except Exception:
pass
2026-01-29 18:33:12 +08:00
cursor.execute("DELETE FROM alert_records WHERE created_at < ?", (cutoff,))
deleted = cursor.rowcount
self._conn.commit()
2026-01-29 18:33:12 +08:00
logger.info(f"清理完成: 删除 {deleted} 条过期记录")
return deleted
2026-01-29 18:33:12 +08:00
except Exception as e:
logger.error(f"数据清理失败: {e}")
return 0
2026-01-29 18:33:12 +08:00
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
cursor = self._conn.cursor()
2026-01-29 18:33:12 +08:00
cursor.execute("SELECT COUNT(*) FROM alert_records")
total = cursor.fetchone()[0]
2026-01-29 18:33:12 +08:00
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE status = 'pending'")
pending = cursor.fetchone()[0]
2026-01-29 18:33:12 +08:00
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE created_at > ?",
((datetime.now() - timedelta(hours=24)).isoformat(),))
today = cursor.fetchone()[0]
2026-01-29 18:33:12 +08:00
db_size = os.path.getsize(self.config.db_path) / (1024 * 1024)
2026-01-29 18:33:12 +08:00
return {
"total_alerts": total,
"pending_alerts": pending,
"today_alerts": today,
"db_size_mb": round(db_size, 2),
"queue_size": self._write_queue.qsize(),
"retention_days": self.config.retention_days,
}
2026-01-29 18:33:12 +08:00
def close(self):
"""关闭数据库"""
self._running = False
if self._write_thread and self._write_thread.is_alive():
self._write_thread.join(timeout=10)
if self._conn:
self._conn.close()
logger.info("SQLite 数据库已关闭")
def save_camera_config(self, camera_id: str, rtsp_url: str, **kwargs) -> bool:
"""保存摄像头配置"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO camera_configs (
camera_id, rtsp_url, camera_name, status, enabled,
location, extra_params, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
camera_id, rtsp_url,
kwargs.get('camera_name'),
kwargs.get('status', True),
kwargs.get('enabled', True),
kwargs.get('location'),
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
now
))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存摄像头配置失败: {e}")
return False
def get_camera_config(self, camera_id: str) -> Optional[Dict[str, Any]]:
"""获取摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM camera_configs WHERE camera_id = ?", (camera_id,))
row = cursor.fetchone()
if row:
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'extra_params', 'updated_at']
return dict(zip(columns, row))
return None
except Exception as e:
logger.error(f"获取摄像头配置失败: {e}")
return None
def get_all_camera_configs(self) -> List[Dict[str, Any]]:
"""获取所有摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM camera_configs ORDER BY camera_id")
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'extra_params', 'updated_at']
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取所有摄像头配置失败: {e}")
return []
def delete_camera_config(self, camera_id: str) -> bool:
"""删除摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("DELETE FROM camera_configs WHERE camera_id = ?", (camera_id,))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除摄像头配置失败: {e}")
return False
def save_roi_config(self, roi_id: str, camera_id: str, roi_type: str,
coordinates: List, algorithm_type: str, **kwargs) -> bool:
"""保存ROI配置"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO roi_configs (
roi_id, camera_id, roi_type, coordinates, algorithm_type,
alert_threshold, alert_cooldown, enabled, extra_params, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
roi_id, camera_id, roi_type, str(coordinates), algorithm_type,
kwargs.get('alert_threshold', 3),
kwargs.get('alert_cooldown', 300),
kwargs.get('enabled', True),
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
now
))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存ROI配置失败: {e}")
return False
def get_roi_config(self, roi_id: str) -> Optional[Dict[str, Any]]:
"""获取ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs WHERE roi_id = ?", (roi_id,))
row = cursor.fetchone()
if row:
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'algorithm_type', 'alert_threshold', 'alert_cooldown',
'enabled', 'extra_params', 'updated_at']
result = dict(zip(columns, row))
try:
result['coordinates'] = eval(result['coordinates'])
except:
pass
return result
return None
except Exception as e:
logger.error(f"获取ROI配置失败: {e}")
return None
def get_rois_by_camera(self, camera_id: str) -> List[Dict[str, Any]]:
"""获取指定摄像头的所有ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs WHERE camera_id = ?", (camera_id,))
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'algorithm_type', 'alert_threshold', 'alert_cooldown',
'enabled', 'extra_params', 'updated_at']
results = []
for row in cursor.fetchall():
r = dict(zip(columns, row))
try:
r['coordinates'] = eval(r['coordinates'])
except:
pass
results.append(r)
return results
except Exception as e:
logger.error(f"获取ROI配置失败: {e}")
return []
def get_all_roi_configs(self) -> List[Dict[str, Any]]:
"""获取所有ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs ORDER BY camera_id, roi_id")
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'algorithm_type', 'alert_threshold', 'alert_cooldown',
'enabled', 'extra_params', 'updated_at']
results = []
for row in cursor.fetchall():
r = dict(zip(columns, row))
try:
r['coordinates'] = eval(r['coordinates'])
except:
pass
results.append(r)
return results
except Exception as e:
logger.error(f"获取所有ROI配置失败: {e}")
return []
def delete_roi_config(self, roi_id: str) -> bool:
"""删除ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("DELETE FROM roi_configs WHERE roi_id = ?", (roi_id,))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除ROI配置失败: {e}")
return False
2026-01-29 18:33:12 +08:00
def get_sqlite_manager() -> SQLiteManager:
"""获取 SQLite 管理器单例"""
return SQLiteManager()