feat: 重构存储策略为 SQLite

- 新增 config/database.py: SQLite 数据库管理器
  - WAL 模式提升写入性能
  - 异步批量写入队列
  - 滚动清理机制(保留7天)

- 新增 core/storage_manager.py: 图片存储管理
  - 异步保存抓拍图片
  - 本地缓存断网容灾
  - 按日期分目录存储

- 更新 config/settings.py: 添加 SQLite 配置
  - SQLiteConfig 数据类
  - 环境变量支持

- 更新 core/result_reporter.py: 适配新存储
  - 使用 SQLite 替代 MySQL
  - AlertInfo 数据类重构
  - 断网自动缓存到本地
This commit is contained in:
2026-01-30 11:34:51 +08:00
parent 6dc3442cc2
commit ccb021677c
11 changed files with 1120 additions and 874 deletions

View File

@@ -1,316 +1,375 @@
"""
数据库连接配置模块
提供MySQL数据库连接池管理和操作封装
SQLite 数据库模块
边缘AI推理服务的本地数据存储
特性:
- WAL 模式Write-Ahead Logging提升写入性能
- 异步写入策略
- 滚动清理机制保留7天数据
"""
import os
import sqlite3
import threading
import queue
import time
import logging
from contextlib import contextmanager
from typing import Any, Dict, Generator, List, Optional
from sqlalchemy import create_engine, Column, String, Boolean, Integer, Float, Text, JSON, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from config.settings import get_settings, DatabaseConfig
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Generator
from dataclasses import dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
Base = declarative_base()
@dataclass
class StorageConfig:
"""存储配置类"""
db_path: str = "./data/security_events.db"
image_dir: str = "./data/captures"
retention_days: int = 7
wal_mode: bool = True
batch_size: int = 100
flush_interval: float = 5.0
class CameraInfo(Base):
"""摄像头信息表模型"""
__tablename__ = "camera_info"
id = Column(Integer, primary_key=True, autoincrement=True)
camera_id = Column(String(64), unique=True, nullable=False, index=True)
camera_name = Column(String(128), nullable=True)
rtsp_url = Column(String(512), nullable=False)
status = Column(Boolean, default=True)
enabled = Column(Boolean, default=True)
location = Column(String(256), nullable=True)
extra_params = Column(JSON, nullable=True)
created_at = Column(DateTime, nullable=True)
updated_at = Column(DateTime, nullable=True)
@dataclass
class AlertRecord:
"""告警记录"""
alert_id: str
camera_id: str
roi_id: str
alert_type: str
target_class: Optional[str] = None
confidence: Optional[float] = None
bbox: Optional[List[float]] = None
message: Optional[str] = None
image_path: Optional[str] = None
status: str = "pending"
created_at: datetime = field(default_factory=datetime.now)
processed_at: Optional[datetime] = None
class ROIConfigModel(Base):
"""ROI配置表模型"""
__tablename__ = "roi_config"
id = Column(Integer, primary_key=True, autoincrement=True)
roi_id = Column(String(64), unique=True, nullable=False, index=True)
camera_id = Column(String(64), nullable=False, index=True)
roi_type = Column(String(32), nullable=False) # 'polygon' or 'rectangle'
coordinates = Column(JSON, nullable=False) # 多边形顶点或矩形坐标
algorithm_type = Column(String(32), nullable=False) # 'leave_post', 'intrusion', etc.
alert_threshold = Column(Integer, default=3)
alert_cooldown = Column(Integer, default=300)
enabled = Column(Boolean, default=True)
extra_params = Column(JSON, nullable=True)
created_at = Column(DateTime, nullable=True)
updated_at = Column(DateTime, nullable=True)
class AlertRecord(Base):
"""告警记录表模型"""
__tablename__ = "alert_records"
id = Column(Integer, primary_key=True, autoincrement=True)
alert_id = Column(String(64), unique=True, nullable=False, index=True)
camera_id = Column(String(64), nullable=False, index=True)
roi_id = Column(String(64), nullable=False, index=True)
alert_type = Column(String(32), nullable=False)
target_class = Column(String(64), nullable=True)
confidence = Column(Float, nullable=True)
bbox = Column(JSON, nullable=True)
message = Column(Text, nullable=True)
screenshot = Column(Text, nullable=True) # Base64编码的截图
status = Column(String(32), default="pending")
created_at = Column(DateTime, nullable=True)
processed_at = Column(DateTime, nullable=True)
class DatabaseManager:
"""数据库连接管理器类"""
class SQLiteManager:
"""SQLite 数据库管理器"""
_instance = None
_engine = None
_session_factory = None
_available = False
_lock = threading.Lock()
def __new__(cls):
def __new__(cls, config: Optional[StorageConfig] = None):
if cls._instance is None:
cls._instance = super().__new__(cls)
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, config: Optional[DatabaseConfig] = None):
if self._engine is not None:
def __init__(self, config: Optional[StorageConfig] = None):
if self._initialized:
return
if config is None:
settings = get_settings()
config = settings.database
config = StorageConfig()
self._init_engine(config)
self.config = config
self._conn: Optional[sqlite3.Connection] = None
self._write_queue: queue.Queue = queue.Queue()
self._running = False
self._write_thread: Optional[threading.Thread] = None
self._cleanup_thread: Optional[threading.Thread] = None
self._init_directories()
self._init_database()
self._start_background_threads()
self._initialized = True
logger.info(f"SQLite 数据库初始化成功: {config.db_path}")
def _init_engine(self, config: DatabaseConfig):
"""初始化数据库引擎"""
try:
connection_string = (
f"mysql+pymysql://{config.username}:{config.password}"
f"@{config.host}:{config.port}/{config.database}"
f"?charset=utf8mb4"
def _init_directories(self):
"""初始化目录"""
Path(self.config.db_path).parent.mkdir(parents=True, exist_ok=True)
Path(self.config.image_dir).mkdir(parents=True, exist_ok=True)
def _init_database(self):
"""初始化数据库表"""
self._conn = sqlite3.connect(
self.config.db_path,
check_same_thread=False,
timeout=30.0
)
if self.config.wal_mode:
cursor = self._conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA cache_size=-64000;")
self._conn.commit()
cursor = self._conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS alert_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
alert_id TEXT UNIQUE NOT NULL,
camera_id TEXT NOT NULL,
roi_id TEXT NOT NULL,
alert_type TEXT NOT NULL,
target_class TEXT,
confidence REAL,
bbox TEXT,
message TEXT,
image_path TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
processed_at TEXT
)
self._engine = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=config.pool_size,
pool_recycle=config.pool_recycle,
echo=config.echo,
pool_pre_ping=True,
max_overflow=5,
)
self._session_factory = sessionmaker(bind=self._engine)
test_connection = self._engine.connect()
test_connection.close()
self._available = True
logger.info(f"数据库引擎初始化成功: {config.host}:{config.port}/{config.database}")
except Exception as e:
self._available = False
logger.warning(f"数据库连接失败,服务将在无数据库模式下运行: {e}")
@property
def is_available(self) -> bool:
"""检查数据库是否可用"""
return self._available
@contextmanager
def get_session(self) -> Generator[Session, None, None]:
"""获取数据库会话上下文"""
if not self._available:
logger.warning("数据库不可用,跳过数据库操作")
yield None
return
""")
session = self._session_factory()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"数据库操作异常: {e}")
raise
finally:
session.close()
def get_camera_info(self, camera_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取摄像头信息"""
if not self._available:
logger.warning("数据库不可用,返回空摄像头列表")
return []
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_camera
ON alert_records(camera_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_created
ON alert_records(created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_status
ON alert_records(status)
""")
with self.get_session() as session:
if session is None:
return []
query = session.query(CameraInfo)
if camera_id:
query = query.filter(CameraInfo.camera_id == camera_id)
cameras = query.filter(CameraInfo.enabled == True).all()
result = []
for camera in cameras:
result.append({
"camera_id": camera.camera_id,
"camera_name": camera.camera_name,
"rtsp_url": camera.rtsp_url,
"status": camera.status,
"location": camera.location,
"extra_params": camera.extra_params,
})
return result
self._conn.commit()
def get_roi_configs(self, camera_id: Optional[str] = None,
roi_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取ROI配置"""
if not self._available:
logger.warning("数据库不可用返回空ROI配置列表")
return []
def _start_background_threads(self):
"""启动后台线程"""
self._running = True
with self.get_session() as session:
if session is None:
return []
query = session.query(ROIConfigModel)
if camera_id:
query = query.filter(ROIConfigModel.camera_id == camera_id)
if roi_id:
query = query.filter(ROIConfigModel.roi_id == roi_id)
query = query.filter(ROIConfigModel.enabled == True)
rois = query.all()
result = []
for roi in rois:
result.append({
"roi_id": roi.roi_id,
"camera_id": roi.camera_id,
"roi_type": roi.roi_type,
"coordinates": roi.coordinates,
"algorithm_type": roi.algorithm_type,
"alert_threshold": roi.alert_threshold,
"alert_cooldown": roi.alert_cooldown,
"extra_params": roi.extra_params,
})
return result
self._write_thread = threading.Thread(
target=self._write_worker,
name="SQLiteWrite",
daemon=True
)
self._write_thread.start()
self._cleanup_thread = threading.Thread(
target=self._cleanup_worker,
name="SQLiteCleanup",
daemon=True
)
self._cleanup_thread.start()
def save_alert_record(self, alert_data: Dict[str, Any]) -> bool:
"""保存告警记录"""
if not self._available:
logger.warning("数据库不可用,跳过保存告警记录")
return False
def _write_worker(self):
"""异步写入工作线程"""
batch = []
last_flush = time.time()
try:
with self.get_session() as session:
if session is None:
return False
alert = AlertRecord(
alert_id=alert_data.get("alert_id"),
camera_id=alert_data.get("camera_id"),
roi_id=alert_data.get("roi_id"),
alert_type=alert_data.get("alert_type"),
target_class=alert_data.get("target_class"),
confidence=alert_data.get("confidence"),
bbox=alert_data.get("bbox"),
message=alert_data.get("message"),
screenshot=alert_data.get("screenshot"),
status=alert_data.get("status", "pending"),
while self._running:
try:
try:
item = self._write_queue.get(timeout=1.0)
batch.append(item)
except queue.Empty:
pass
should_flush = (
len(batch) >= self.config.batch_size or
time.time() - last_flush >= self.config.flush_interval
)
session.add(alert)
session.flush()
logger.info(f"告警记录保存成功: {alert_data.get('alert_id')}")
return True
except Exception as e:
logger.error(f"保存告警记录失败: {e}")
return False
if batch and (should_flush or len(batch) >= 1000):
self._flush_batch(batch)
batch.clear()
last_flush = time.time()
except Exception as e:
logger.error(f"SQLite 写入异常: {e}")
if batch:
self._flush_batch(batch)
def update_alert_status(self, alert_id: str, status: str) -> bool:
def _flush_batch(self, batch: List[Dict[str, Any]]):
"""批量写入数据库"""
try:
cursor = self._conn.cursor()
for record in batch:
cursor.execute("""
INSERT OR REPLACE INTO alert_records (
alert_id, camera_id, roi_id, alert_type,
target_class, confidence, bbox, message,
image_path, status, created_at, processed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record['alert_id'],
record['camera_id'],
record['roi_id'],
record['alert_type'],
record.get('target_class'),
record.get('confidence'),
record.get('bbox'),
record.get('message'),
record.get('image_path'),
record.get('status', 'pending'),
record['created_at'],
record.get('processed_at'),
))
self._conn.commit()
logger.debug(f"批量写入 {len(batch)} 条记录")
except Exception as e:
logger.error(f"批量写入失败: {e}")
def _cleanup_worker(self):
"""清理工作线程(每天执行一次)"""
while self._running:
try:
time.sleep(3600)
if self._running:
self.cleanup_old_data()
except Exception as e:
logger.error(f"数据清理异常: {e}")
def queue_alert(self, alert: AlertRecord):
"""将告警加入写入队列"""
record = {
'alert_id': alert.alert_id,
'camera_id': alert.camera_id,
'roi_id': alert.roi_id,
'alert_type': alert.alert_type,
'target_class': alert.target_class,
'confidence': alert.confidence,
'bbox': str(alert.bbox) if alert.bbox else None,
'message': alert.message,
'image_path': alert.image_path,
'status': alert.status,
'created_at': alert.created_at.isoformat(),
'processed_at': alert.processed_at.isoformat() if alert.processed_at else None,
}
self._write_queue.put(record)
def get_alerts(
self,
camera_id: Optional[str] = None,
status: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""查询告警记录"""
cursor = self._conn.cursor()
query = "SELECT * FROM alert_records WHERE 1=1"
params = []
if camera_id:
query += " AND camera_id = ?"
params.append(camera_id)
if status:
query += " AND status = ?"
params.append(status)
if start_time:
query += " AND created_at >= ?"
params.append(start_time.isoformat())
if end_time:
query += " AND created_at <= ?"
params.append(end_time.isoformat())
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
columns = ['id', 'alert_id', 'camera_id', 'roi_id', 'alert_type',
'target_class', 'confidence', 'bbox', 'message', 'image_path',
'status', 'created_at', 'processed_at']
return [dict(zip(columns, row)) for row in rows]
def update_status(self, alert_id: str, status: str) -> bool:
"""更新告警状态"""
if not self._available:
logger.warning("数据库不可用,跳过更新告警状态")
return False
try:
with self.get_session() as session:
if session is None:
return False
from sqlalchemy import update
from datetime import datetime
stmt = update(AlertRecord).where(
AlertRecord.alert_id == alert_id
).values(
status=status,
processed_at=datetime.now()
)
session.execute(stmt)
logger.info(f"告警状态更新成功: {alert_id} -> {status}")
return True
cursor = self._conn.cursor()
cursor.execute("""
UPDATE alert_records
SET status = ?, processed_at = ?
WHERE alert_id = ?
""", (status, datetime.now().isoformat(), alert_id))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"更新告警状态失败: {e}")
logger.error(f"更新状态失败: {e}")
return False
def create_tables(self):
"""创建所有表"""
if not self._available:
logger.warning("数据库不可用,跳过创建表")
return
Base.metadata.create_all(self._engine)
logger.info("数据库表创建完成")
def cleanup_old_data(self):
"""清理过期数据"""
try:
cutoff = (datetime.now() - timedelta(days=self.config.retention_days)).isoformat()
cursor = self._conn.cursor()
cursor.execute("SELECT image_path FROM alert_records WHERE created_at < ?", (cutoff,))
images = cursor.fetchall()
for (img_path,) in images:
if img_path and os.path.exists(img_path):
try:
os.remove(img_path)
except Exception:
pass
cursor.execute("DELETE FROM alert_records WHERE created_at < ?", (cutoff,))
deleted = cursor.rowcount
self._conn.commit()
logger.info(f"清理完成: 删除 {deleted} 条过期记录")
return deleted
except Exception as e:
logger.error(f"数据清理失败: {e}")
return 0
def drop_tables(self):
"""删除所有表"""
if not self._available:
return
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
cursor = self._conn.cursor()
Base.metadata.drop_all(self._engine)
logger.info("数据库表删除完成")
cursor.execute("SELECT COUNT(*) FROM alert_records")
total = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE status = 'pending'")
pending = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE created_at > ?",
((datetime.now() - timedelta(hours=24)).isoformat(),))
today = cursor.fetchone()[0]
db_size = os.path.getsize(self.config.db_path) / (1024 * 1024)
return {
"total_alerts": total,
"pending_alerts": pending,
"today_alerts": today,
"db_size_mb": round(db_size, 2),
"queue_size": self._write_queue.qsize(),
"retention_days": self.config.retention_days,
}
def close(self):
"""关闭数据库连接"""
if self._engine:
self._engine.dispose()
logger.info("数据库连接已关闭")
"""关闭数据库"""
self._running = False
if self._write_thread and self._write_thread.is_alive():
self._write_thread.join(timeout=10)
if self._conn:
self._conn.close()
logger.info("SQLite 数据库已关闭")
def get_database_manager() -> DatabaseManager:
"""获取数据库管理器单例"""
return DatabaseManager()
def init_database():
"""初始化数据库"""
db_manager = get_database_manager()
db_manager.create_tables()
return db_manager
def get_sqlite_manager() -> SQLiteManager:
"""获取 SQLite 管理器单例"""
return SQLiteManager()