Files
security-ai-edge/config/database.py
16337 f90ff60f6c feat: 添加离岗时长记录
- alert_records 添加 duration_minutes 字段
- AlgorithmManager 输出 duration_minutes
- AlertRecord 添加 duration_minutes 字段
2026-01-30 17:26:12 +08:00

668 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SQLite 数据库模块
边缘AI推理服务的本地数据存储
特性:
- WAL 模式Write-Ahead Logging提升写入性能
- 异步写入策略
- 滚动清理机制保留7天数据
"""
import os
import sqlite3
import threading
import queue
import time
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Generator
from dataclasses import dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class StorageConfig:
"""存储配置类"""
db_path: str = "./data/security_events.db"
image_dir: str = "./data/captures"
retention_days: int = 7
wal_mode: bool = True
batch_size: int = 100
flush_interval: float = 5.0
@dataclass
class AlertRecord:
"""告警记录"""
alert_id: str
camera_id: str
roi_id: str
alert_type: str
target_class: Optional[str] = None
confidence: Optional[float] = None
bbox: Optional[List[float]] = None
message: Optional[str] = None
image_path: Optional[str] = None
status: str = "pending"
created_at: datetime = field(default_factory=datetime.now)
processed_at: Optional[datetime] = None
duration_minutes: Optional[float] = None
class SQLiteManager:
"""SQLite 数据库管理器"""
_instance = None
_lock = threading.Lock()
def __new__(cls, config: Optional[StorageConfig] = None):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, config: Optional[StorageConfig] = None):
if self._initialized:
return
if config is None:
config = StorageConfig()
self.config = config
self._conn: Optional[sqlite3.Connection] = None
self._write_queue: queue.Queue = queue.Queue()
self._running = False
self._write_thread: Optional[threading.Thread] = None
self._cleanup_thread: Optional[threading.Thread] = None
self._init_directories()
self._init_database()
self._start_background_threads()
self._initialized = True
logger.info(f"SQLite 数据库初始化成功: {config.db_path}")
def _init_directories(self):
"""初始化目录"""
Path(self.config.db_path).parent.mkdir(parents=True, exist_ok=True)
Path(self.config.image_dir).mkdir(parents=True, exist_ok=True)
def _init_database(self):
"""初始化数据库表"""
self._conn = sqlite3.connect(
self.config.db_path,
check_same_thread=False,
timeout=30.0
)
if self.config.wal_mode:
cursor = self._conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA cache_size=-64000;")
self._conn.commit()
cursor = self._conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS alert_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
alert_id TEXT UNIQUE NOT NULL,
camera_id TEXT NOT NULL,
roi_id TEXT NOT NULL,
alert_type TEXT NOT NULL,
target_class TEXT,
confidence REAL,
bbox TEXT,
message TEXT,
image_path TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
processed_at TEXT,
duration_minutes REAL
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_camera
ON alert_records(camera_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_created
ON alert_records(created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_status
ON alert_records(status)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS camera_configs (
camera_id TEXT PRIMARY KEY,
rtsp_url TEXT NOT NULL,
camera_name TEXT,
status BOOLEAN DEFAULT 1,
enabled BOOLEAN DEFAULT 1,
location TEXT,
extra_params TEXT,
updated_at TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS roi_configs (
roi_id TEXT PRIMARY KEY,
camera_id TEXT NOT NULL,
roi_type TEXT NOT NULL,
coordinates TEXT NOT NULL,
algorithm_type TEXT NOT NULL,
alert_threshold INTEGER DEFAULT 3,
alert_cooldown INTEGER DEFAULT 300,
enabled BOOLEAN DEFAULT 1,
extra_params TEXT,
working_hours TEXT,
confirm_on_duty_sec INTEGER DEFAULT 10,
confirm_leave_sec INTEGER DEFAULT 10,
cooldown_sec INTEGER DEFAULT 300,
target_class TEXT DEFAULT 'person',
updated_at TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS config_update_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
config_type TEXT NOT NULL,
config_id TEXT,
old_value TEXT,
new_value TEXT,
updated_by TEXT,
updated_at TEXT
)
""")
self._conn.commit()
def _start_background_threads(self):
"""启动后台线程"""
self._running = True
self._write_thread = threading.Thread(
target=self._write_worker,
name="SQLiteWrite",
daemon=True
)
self._write_thread.start()
self._cleanup_thread = threading.Thread(
target=self._cleanup_worker,
name="SQLiteCleanup",
daemon=True
)
self._cleanup_thread.start()
def _write_worker(self):
"""异步写入工作线程"""
batch = []
last_flush = time.time()
while self._running:
try:
try:
item = self._write_queue.get(timeout=1.0)
batch.append(item)
except queue.Empty:
pass
should_flush = (
len(batch) >= self.config.batch_size or
time.time() - last_flush >= self.config.flush_interval
)
if batch and (should_flush or len(batch) >= 1000):
self._flush_batch(batch)
batch.clear()
last_flush = time.time()
except Exception as e:
logger.error(f"SQLite 写入异常: {e}")
if batch:
self._flush_batch(batch)
def _flush_batch(self, batch: List[Dict[str, Any]]):
"""批量写入数据库"""
try:
cursor = self._conn.cursor()
for record in batch:
cursor.execute("""
INSERT OR REPLACE INTO alert_records (
alert_id, camera_id, roi_id, alert_type,
target_class, confidence, bbox, message,
image_path, status, created_at, processed_at,
duration_minutes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record['alert_id'],
record['camera_id'],
record['roi_id'],
record['alert_type'],
record.get('target_class'),
record.get('confidence'),
record.get('bbox'),
record.get('message'),
record.get('image_path'),
record.get('status', 'pending'),
record['created_at'],
record.get('processed_at'),
record.get('duration_minutes'),
))
self._conn.commit()
logger.debug(f"批量写入 {len(batch)} 条记录")
except Exception as e:
logger.error(f"批量写入失败: {e}")
def _cleanup_worker(self):
"""清理工作线程(每天执行一次)"""
while self._running:
try:
time.sleep(3600)
if self._running:
self.cleanup_old_data()
except Exception as e:
logger.error(f"数据清理异常: {e}")
def queue_alert(self, alert: AlertRecord):
"""将告警加入写入队列"""
record = {
'alert_id': alert.alert_id,
'camera_id': alert.camera_id,
'roi_id': alert.roi_id,
'alert_type': alert.alert_type,
'target_class': alert.target_class,
'confidence': alert.confidence,
'bbox': str(alert.bbox) if alert.bbox else None,
'message': alert.message,
'image_path': alert.image_path,
'status': alert.status,
'created_at': alert.created_at.isoformat(),
'processed_at': alert.processed_at.isoformat() if alert.processed_at else None,
'duration_minutes': alert.duration_minutes,
}
self._write_queue.put(record)
def get_alerts(
self,
camera_id: Optional[str] = None,
status: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""查询告警记录"""
cursor = self._conn.cursor()
query = "SELECT * FROM alert_records WHERE 1=1"
params = []
if camera_id:
query += " AND camera_id = ?"
params.append(camera_id)
if status:
query += " AND status = ?"
params.append(status)
if start_time:
query += " AND created_at >= ?"
params.append(start_time.isoformat())
if end_time:
query += " AND created_at <= ?"
params.append(end_time.isoformat())
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
columns = ['id', 'alert_id', 'camera_id', 'roi_id', 'alert_type',
'target_class', 'confidence', 'bbox', 'message', 'image_path',
'status', 'created_at', 'processed_at']
return [dict(zip(columns, row)) for row in rows]
def update_status(self, alert_id: str, status: str) -> bool:
"""更新告警状态"""
try:
cursor = self._conn.cursor()
cursor.execute("""
UPDATE alert_records
SET status = ?, processed_at = ?
WHERE alert_id = ?
""", (status, datetime.now().isoformat(), alert_id))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"更新状态失败: {e}")
return False
def cleanup_old_data(self):
"""清理过期数据"""
try:
cutoff = (datetime.now() - timedelta(days=self.config.retention_days)).isoformat()
cursor = self._conn.cursor()
cursor.execute("SELECT image_path FROM alert_records WHERE created_at < ?", (cutoff,))
images = cursor.fetchall()
for (img_path,) in images:
if img_path and os.path.exists(img_path):
try:
os.remove(img_path)
except Exception:
pass
cursor.execute("DELETE FROM alert_records WHERE created_at < ?", (cutoff,))
deleted = cursor.rowcount
self._conn.commit()
logger.info(f"清理完成: 删除 {deleted} 条过期记录")
return deleted
except Exception as e:
logger.error(f"数据清理失败: {e}")
return 0
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
cursor = self._conn.cursor()
cursor.execute("SELECT COUNT(*) FROM alert_records")
total = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE status = 'pending'")
pending = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE created_at > ?",
((datetime.now() - timedelta(hours=24)).isoformat(),))
today = cursor.fetchone()[0]
db_size = os.path.getsize(self.config.db_path) / (1024 * 1024)
return {
"total_alerts": total,
"pending_alerts": pending,
"today_alerts": today,
"db_size_mb": round(db_size, 2),
"queue_size": self._write_queue.qsize(),
"retention_days": self.config.retention_days,
}
def close(self):
"""关闭数据库"""
self._running = False
if self._write_thread and self._write_thread.is_alive():
self._write_thread.join(timeout=10)
if self._conn:
self._conn.close()
logger.info("SQLite 数据库已关闭")
def save_camera_config(self, camera_id: str, rtsp_url: str, **kwargs) -> bool:
"""保存摄像头配置"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO camera_configs (
camera_id, rtsp_url, camera_name, status, enabled,
location, extra_params, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
camera_id, rtsp_url,
kwargs.get('camera_name'),
kwargs.get('status', True),
kwargs.get('enabled', True),
kwargs.get('location'),
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
now
))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存摄像头配置失败: {e}")
return False
def get_camera_config(self, camera_id: str) -> Optional[Dict[str, Any]]:
"""获取摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM camera_configs WHERE camera_id = ?", (camera_id,))
row = cursor.fetchone()
if row:
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'extra_params', 'updated_at']
return dict(zip(columns, row))
return None
except Exception as e:
logger.error(f"获取摄像头配置失败: {e}")
return None
def get_all_camera_configs(self) -> List[Dict[str, Any]]:
"""获取所有摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM camera_configs ORDER BY camera_id")
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'extra_params', 'updated_at']
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取所有摄像头配置失败: {e}")
return []
def delete_camera_config(self, camera_id: str) -> bool:
"""删除摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("DELETE FROM camera_configs WHERE camera_id = ?", (camera_id,))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除摄像头配置失败: {e}")
return False
def save_roi_config(self, roi_id: str, camera_id: str, roi_type: str,
coordinates: List, algorithm_type: str, **kwargs) -> bool:
"""保存ROI配置"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO roi_configs (
roi_id, camera_id, roi_type, coordinates, algorithm_type,
alert_threshold, alert_cooldown, enabled, extra_params,
working_hours, confirm_on_duty_sec, confirm_leave_sec,
cooldown_sec, target_class, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
roi_id, camera_id, roi_type, str(coordinates), algorithm_type,
kwargs.get('alert_threshold', 3),
kwargs.get('alert_cooldown', 300),
kwargs.get('enabled', True),
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
str(kwargs.get('working_hours')) if kwargs.get('working_hours') else None,
kwargs.get('confirm_on_duty_sec', 10),
kwargs.get('confirm_leave_sec', 10),
kwargs.get('cooldown_sec', 300),
kwargs.get('target_class', 'person'),
now
))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存ROI配置失败: {e}")
return False
def get_roi_config(self, roi_id: str) -> Optional[Dict[str, Any]]:
"""获取ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs WHERE roi_id = ?", (roi_id,))
row = cursor.fetchone()
if row:
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'algorithm_type', 'alert_threshold', 'alert_cooldown',
'enabled', 'extra_params', 'working_hours',
'confirm_on_duty_sec', 'confirm_leave_sec', 'cooldown_sec',
'target_class', 'updated_at']
result = dict(zip(columns, row))
try:
result['coordinates'] = eval(result['coordinates'])
except:
pass
try:
if result.get('working_hours'):
result['working_hours'] = eval(result['working_hours'])
except:
pass
return result
return None
except Exception as e:
logger.error(f"获取ROI配置失败: {e}")
return None
def get_rois_by_camera(self, camera_id: str) -> List[Dict[str, Any]]:
"""获取指定摄像头的所有ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs WHERE camera_id = ?", (camera_id,))
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'algorithm_type', 'alert_threshold', 'alert_cooldown',
'enabled', 'extra_params', 'working_hours',
'confirm_on_duty_sec', 'confirm_leave_sec', 'cooldown_sec',
'target_class', 'updated_at']
results = []
for row in cursor.fetchall():
r = dict(zip(columns, row))
try:
r['coordinates'] = eval(r['coordinates'])
except:
pass
try:
if r.get('working_hours'):
r['working_hours'] = eval(r['working_hours'])
except:
pass
results.append(r)
return results
except Exception as e:
logger.error(f"获取ROI配置失败: {e}")
return []
def get_all_roi_configs(self) -> List[Dict[str, Any]]:
"""获取所有ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs ORDER BY camera_id, roi_id")
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'algorithm_type', 'alert_threshold', 'alert_cooldown',
'enabled', 'extra_params', 'working_hours',
'confirm_on_duty_sec', 'confirm_leave_sec', 'cooldown_sec',
'target_class', 'updated_at']
results = []
for row in cursor.fetchall():
r = dict(zip(columns, row))
try:
r['coordinates'] = eval(r['coordinates'])
except:
pass
try:
if r.get('working_hours'):
r['working_hours'] = eval(r['working_hours'])
except:
pass
results.append(r)
return results
except Exception as e:
logger.error(f"获取所有ROI配置失败: {e}")
return []
def delete_roi_config(self, roi_id: str) -> bool:
"""删除ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("DELETE FROM roi_configs WHERE roi_id = ?", (roi_id,))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除ROI配置失败: {e}")
return False
def log_config_update(
self,
config_type: str,
config_id: Optional[str],
old_value: Any,
new_value: Any,
updated_by: str = "system"
):
"""记录配置更新日志"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT INTO config_update_log (
config_type, config_id, old_value, new_value, updated_by, updated_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (
config_type,
config_id,
str(old_value) if old_value else None,
str(new_value) if new_value else None,
updated_by,
now
))
self._conn.commit()
logger.info(f"配置更新日志已记录: {config_type}/{config_id}")
except Exception as e:
logger.error(f"记录配置更新日志失败: {e}")
def get_config_update_log(
self,
config_type: Optional[str] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""获取配置更新日志"""
try:
cursor = self._conn.cursor()
query = "SELECT * FROM config_update_log WHERE 1=1"
params = []
if config_type:
query += " AND config_type = ?"
params.append(config_type)
query += " ORDER BY id DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
columns = ['id', 'config_type', 'config_id', 'old_value', 'new_value',
'updated_by', 'updated_at']
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取配置更新日志失败: {e}")
return []
def get_sqlite_manager() -> SQLiteManager:
"""获取 SQLite 管理器单例"""
return SQLiteManager()