- CameraInfo 模型添加 area_id 字段 - SQLite 表增加 area_id 列及迁移 - config_sync 同步 area_id 到本地 - 告警 ext_data 携带 area_id - 截图处理器使用独立 Redis 连接,避免与配置同步阻塞冲突 - get_all_camera_configs 使用 cursor.description 动态获取列名 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
976 lines
36 KiB
Python
976 lines
36 KiB
Python
"""
|
||
SQLite 数据库模块
|
||
边缘AI推理服务的本地数据存储
|
||
|
||
特性:
|
||
- WAL 模式(Write-Ahead Logging)提升写入性能
|
||
- 异步写入策略
|
||
- 滚动清理机制(保留7天数据)
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import sqlite3
|
||
import threading
|
||
import queue
|
||
import time
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from typing import Any, Dict, List, Optional, Generator
|
||
|
||
|
||
def _normalize_coordinates(coords):
|
||
"""将坐标统一为 [[x,y],...] 格式,兼容 [{'x':..,'y':..},...] 格式"""
|
||
if isinstance(coords, str):
|
||
try:
|
||
coords = eval(coords)
|
||
except:
|
||
return coords
|
||
if isinstance(coords, list) and coords and isinstance(coords[0], dict):
|
||
return [[p.get("x", 0), p.get("y", 0)] for p in coords]
|
||
return coords
|
||
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class StorageConfig:
|
||
"""存储配置类"""
|
||
db_path: str = "./data/security_events.db"
|
||
image_dir: str = "./data/captures"
|
||
retention_days: int = 7
|
||
wal_mode: bool = True
|
||
batch_size: int = 100
|
||
flush_interval: float = 5.0
|
||
|
||
|
||
@dataclass
|
||
class AlertRecord:
|
||
"""告警记录"""
|
||
alert_id: str
|
||
camera_id: str
|
||
roi_id: str
|
||
bind_id: Optional[str] = None # 关联 roi_algo_bind 表
|
||
alert_type: str = "detection"
|
||
target_class: Optional[str] = None
|
||
confidence: Optional[float] = None
|
||
bbox: Optional[List[float]] = None
|
||
message: Optional[str] = None
|
||
image_path: Optional[str] = None
|
||
status: str = "pending"
|
||
created_at: datetime = field(default_factory=datetime.now)
|
||
processed_at: Optional[datetime] = None
|
||
duration_minutes: Optional[float] = None
|
||
detections: Optional[str] = None # JSON格式的检测结果
|
||
|
||
|
||
class SQLiteManager:
|
||
"""SQLite 数据库管理器"""
|
||
|
||
_instance = None
|
||
_lock = threading.Lock()
|
||
|
||
def __new__(cls, config: Optional[StorageConfig] = None):
|
||
if cls._instance is None:
|
||
with cls._lock:
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
cls._instance._initialized = False
|
||
return cls._instance
|
||
|
||
def __init__(self, config: Optional[StorageConfig] = None):
|
||
if self._initialized:
|
||
return
|
||
|
||
if config is None:
|
||
config = StorageConfig()
|
||
|
||
self.config = config
|
||
self._conn: Optional[sqlite3.Connection] = None
|
||
self._write_queue: queue.Queue = queue.Queue()
|
||
self._running = False
|
||
self._write_thread: Optional[threading.Thread] = None
|
||
self._cleanup_thread: Optional[threading.Thread] = None
|
||
|
||
self._init_directories()
|
||
self._init_database()
|
||
self._start_background_threads()
|
||
|
||
self._initialized = True
|
||
logger.info(f"SQLite 数据库初始化成功: {config.db_path}")
|
||
|
||
def _init_directories(self):
|
||
"""初始化目录"""
|
||
Path(self.config.db_path).parent.mkdir(parents=True, exist_ok=True)
|
||
Path(self.config.image_dir).mkdir(parents=True, exist_ok=True)
|
||
|
||
def _init_database(self):
|
||
"""初始化数据库表"""
|
||
self._conn = sqlite3.connect(
|
||
self.config.db_path,
|
||
check_same_thread=False,
|
||
timeout=30.0
|
||
)
|
||
|
||
if self.config.wal_mode:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("PRAGMA journal_mode=WAL;")
|
||
cursor.execute("PRAGMA synchronous=NORMAL;")
|
||
cursor.execute("PRAGMA cache_size=-64000;")
|
||
self._conn.commit()
|
||
|
||
cursor = self._conn.cursor()
|
||
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS algorithm_registry (
|
||
algo_code TEXT PRIMARY KEY,
|
||
algo_name TEXT NOT NULL,
|
||
target_class TEXT DEFAULT 'person',
|
||
param_schema TEXT,
|
||
description TEXT,
|
||
is_active BOOLEAN DEFAULT 1,
|
||
created_at TEXT,
|
||
updated_at TEXT
|
||
)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_algo_active
|
||
ON algorithm_registry(is_active)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS roi_configs (
|
||
roi_id TEXT PRIMARY KEY,
|
||
camera_id TEXT NOT NULL,
|
||
roi_type TEXT NOT NULL,
|
||
coordinates TEXT NOT NULL,
|
||
enabled BOOLEAN DEFAULT 1,
|
||
priority INTEGER DEFAULT 0,
|
||
extra_params TEXT,
|
||
updated_at TEXT
|
||
)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_roi_camera
|
||
ON roi_configs(camera_id)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS roi_algo_bind (
|
||
bind_id TEXT PRIMARY KEY,
|
||
roi_id TEXT NOT NULL,
|
||
algo_code TEXT NOT NULL,
|
||
params TEXT NOT NULL,
|
||
priority INTEGER DEFAULT 0,
|
||
enabled BOOLEAN DEFAULT 1,
|
||
created_at TEXT,
|
||
updated_at TEXT,
|
||
FOREIGN KEY (roi_id) REFERENCES roi_configs(roi_id) ON DELETE CASCADE,
|
||
FOREIGN KEY (algo_code) REFERENCES algorithm_registry(algo_code) ON DELETE RESTRICT
|
||
)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_bind_roi
|
||
ON roi_algo_bind(roi_id)
|
||
""")
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_bind_algo
|
||
ON roi_algo_bind(algo_code)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS alert_records (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
alert_id TEXT UNIQUE NOT NULL,
|
||
camera_id TEXT NOT NULL,
|
||
roi_id TEXT NOT NULL,
|
||
bind_id TEXT,
|
||
alert_type TEXT NOT NULL,
|
||
target_class TEXT,
|
||
confidence REAL,
|
||
bbox TEXT,
|
||
message TEXT,
|
||
image_path TEXT,
|
||
status TEXT DEFAULT 'pending',
|
||
created_at TEXT NOT NULL,
|
||
processed_at TEXT,
|
||
duration_minutes REAL,
|
||
detections TEXT,
|
||
FOREIGN KEY (bind_id) REFERENCES roi_algo_bind(bind_id) ON DELETE SET NULL
|
||
)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_alert_camera
|
||
ON alert_records(camera_id)
|
||
""")
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_alert_created
|
||
ON alert_records(created_at)
|
||
""")
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_alert_status
|
||
ON alert_records(status)
|
||
""")
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_alert_bind
|
||
ON alert_records(bind_id)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS camera_configs (
|
||
camera_id TEXT PRIMARY KEY,
|
||
rtsp_url TEXT NOT NULL,
|
||
camera_name TEXT,
|
||
status BOOLEAN DEFAULT 1,
|
||
enabled BOOLEAN DEFAULT 1,
|
||
location TEXT,
|
||
roi_group_id TEXT,
|
||
extra_params TEXT,
|
||
area_id INTEGER,
|
||
updated_at TEXT
|
||
)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS config_update_log (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
config_type TEXT NOT NULL,
|
||
config_id TEXT,
|
||
old_value TEXT,
|
||
new_value TEXT,
|
||
updated_by TEXT,
|
||
updated_at TEXT
|
||
)
|
||
""")
|
||
|
||
self._conn.commit()
|
||
|
||
# 迁移:为已有数据库添加 area_id 列
|
||
try:
|
||
cursor.execute("ALTER TABLE camera_configs ADD COLUMN area_id INTEGER")
|
||
self._conn.commit()
|
||
except Exception:
|
||
pass # 列已存在,忽略
|
||
|
||
self._init_default_algorithms()
|
||
|
||
def _init_default_algorithms(self):
|
||
"""初始化默认算法配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
|
||
algorithms = [
|
||
{
|
||
'algo_code': 'leave_post',
|
||
'algo_name': '离岗检测',
|
||
'target_class': 'person',
|
||
'param_schema': json.dumps({
|
||
"confirm_on_duty_sec": {"type": "int", "default": 10, "min": 1},
|
||
"confirm_leave_sec": {"type": "int", "default": 30, "min": 1},
|
||
"cooldown_sec": {"type": "int", "default": 600, "min": 0},
|
||
"working_hours": {"type": "list", "default": []},
|
||
}),
|
||
'description': '检测人员是否在岗,支持工作时间段配置'
|
||
},
|
||
{
|
||
'algo_code': 'intrusion',
|
||
'algo_name': '周界入侵检测',
|
||
'target_class': 'person',
|
||
'param_schema': json.dumps({
|
||
"cooldown_seconds": {"type": "int", "default": 300, "min": 0},
|
||
"confirm_seconds": {"type": "int", "default": 5, "min": 1},
|
||
}),
|
||
'description': '检测人员进入指定区域,支持确认时间和冷却时间配置'
|
||
},
|
||
{
|
||
'algo_code': 'crowd_detection',
|
||
'algo_name': '人群聚集检测',
|
||
'target_class': 'person',
|
||
'param_schema': json.dumps({
|
||
"max_count": {"type": "int", "default": 10, "min": 1},
|
||
"cooldown_seconds": {"type": "int", "default": 300, "min": 0},
|
||
}),
|
||
'description': '检测区域内人员数量是否超过阈值'
|
||
},
|
||
]
|
||
|
||
for algo in algorithms:
|
||
cursor.execute("""
|
||
INSERT OR IGNORE INTO algorithm_registry (
|
||
algo_code, algo_name, target_class, param_schema, description,
|
||
is_active, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, 1, ?, ?)
|
||
""", (
|
||
algo['algo_code'], algo['algo_name'], algo['target_class'],
|
||
algo['param_schema'], algo['description'],
|
||
datetime.now().isoformat(), datetime.now().isoformat()
|
||
))
|
||
|
||
self._conn.commit()
|
||
logger.info(f"已初始化 {len(algorithms)} 个默认算法配置")
|
||
|
||
except Exception as e:
|
||
logger.error(f"初始化默认算法失败: {e}")
|
||
|
||
def _start_background_threads(self):
|
||
"""启动后台线程"""
|
||
self._running = True
|
||
|
||
self._write_thread = threading.Thread(
|
||
target=self._write_worker,
|
||
name="SQLiteWrite",
|
||
daemon=True
|
||
)
|
||
self._write_thread.start()
|
||
|
||
self._cleanup_thread = threading.Thread(
|
||
target=self._cleanup_worker,
|
||
name="SQLiteCleanup",
|
||
daemon=True
|
||
)
|
||
self._cleanup_thread.start()
|
||
|
||
def _write_worker(self):
|
||
"""异步写入工作线程"""
|
||
batch = []
|
||
last_flush = time.time()
|
||
|
||
while self._running:
|
||
try:
|
||
try:
|
||
item = self._write_queue.get(timeout=1.0)
|
||
batch.append(item)
|
||
except queue.Empty:
|
||
pass
|
||
|
||
should_flush = (
|
||
len(batch) >= self.config.batch_size or
|
||
time.time() - last_flush >= self.config.flush_interval
|
||
)
|
||
|
||
if batch and (should_flush or len(batch) >= 1000):
|
||
self._flush_batch(batch)
|
||
batch.clear()
|
||
last_flush = time.time()
|
||
|
||
except Exception as e:
|
||
logger.error(f"SQLite 写入异常: {e}")
|
||
|
||
if batch:
|
||
self._flush_batch(batch)
|
||
|
||
def _flush_batch(self, batch: List[Dict[str, Any]]):
|
||
"""批量写入数据库"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
|
||
for record in batch:
|
||
cursor.execute("""
|
||
INSERT OR REPLACE INTO alert_records (
|
||
alert_id, camera_id, roi_id, bind_id, alert_type,
|
||
target_class, confidence, bbox, message,
|
||
image_path, status, created_at, processed_at,
|
||
duration_minutes, detections
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
record['alert_id'],
|
||
record['camera_id'],
|
||
record['roi_id'],
|
||
record.get('bind_id'),
|
||
record['alert_type'],
|
||
record.get('target_class'),
|
||
record.get('confidence'),
|
||
record.get('bbox'),
|
||
record.get('message'),
|
||
record.get('image_path'),
|
||
record.get('status', 'pending'),
|
||
record['created_at'],
|
||
record.get('processed_at'),
|
||
record.get('duration_minutes'),
|
||
record.get('detections'),
|
||
))
|
||
|
||
self._conn.commit()
|
||
logger.debug(f"批量写入 {len(batch)} 条记录")
|
||
|
||
except Exception as e:
|
||
logger.error(f"批量写入失败: {e}")
|
||
|
||
def _cleanup_worker(self):
|
||
"""清理工作线程(每天执行一次)"""
|
||
while self._running:
|
||
try:
|
||
time.sleep(3600)
|
||
if self._running:
|
||
self.cleanup_old_data()
|
||
except Exception as e:
|
||
logger.error(f"数据清理异常: {e}")
|
||
|
||
def queue_alert(self, alert: AlertRecord):
|
||
"""将告警加入写入队列"""
|
||
record = {
|
||
'alert_id': alert.alert_id,
|
||
'camera_id': alert.camera_id,
|
||
'roi_id': alert.roi_id,
|
||
'bind_id': alert.bind_id,
|
||
'alert_type': alert.alert_type,
|
||
'target_class': alert.target_class,
|
||
'confidence': alert.confidence,
|
||
'bbox': str(alert.bbox) if alert.bbox else None,
|
||
'message': alert.message,
|
||
'image_path': alert.image_path,
|
||
'status': alert.status,
|
||
'created_at': alert.created_at.isoformat(),
|
||
'processed_at': alert.processed_at.isoformat() if alert.processed_at else None,
|
||
'duration_minutes': alert.duration_minutes,
|
||
'detections': alert.detections,
|
||
}
|
||
self._write_queue.put(record)
|
||
|
||
def get_alerts(
|
||
self,
|
||
camera_id: Optional[str] = None,
|
||
status: Optional[str] = None,
|
||
start_time: Optional[datetime] = None,
|
||
end_time: Optional[datetime] = None,
|
||
limit: int = 100
|
||
) -> List[Dict[str, Any]]:
|
||
"""查询告警记录"""
|
||
cursor = self._conn.cursor()
|
||
|
||
query = "SELECT * FROM alert_records WHERE 1=1"
|
||
params = []
|
||
|
||
if camera_id:
|
||
query += " AND camera_id = ?"
|
||
params.append(camera_id)
|
||
|
||
if status:
|
||
query += " AND status = ?"
|
||
params.append(status)
|
||
|
||
if start_time:
|
||
query += " AND created_at >= ?"
|
||
params.append(start_time.isoformat())
|
||
|
||
if end_time:
|
||
query += " AND created_at <= ?"
|
||
params.append(end_time.isoformat())
|
||
|
||
query += " ORDER BY created_at DESC LIMIT ?"
|
||
params.append(limit)
|
||
|
||
cursor.execute(query, params)
|
||
rows = cursor.fetchall()
|
||
|
||
columns = ['id', 'alert_id', 'camera_id', 'roi_id', 'alert_type',
|
||
'target_class', 'confidence', 'bbox', 'message', 'image_path',
|
||
'status', 'created_at', 'processed_at']
|
||
|
||
return [dict(zip(columns, row)) for row in rows]
|
||
|
||
def update_status(self, alert_id: str, status: str) -> bool:
|
||
"""更新告警状态"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE alert_records
|
||
SET status = ?, processed_at = ?
|
||
WHERE alert_id = ?
|
||
""", (status, datetime.now().isoformat(), alert_id))
|
||
self._conn.commit()
|
||
return cursor.rowcount > 0
|
||
except Exception as e:
|
||
logger.error(f"更新状态失败: {e}")
|
||
return False
|
||
|
||
def cleanup_old_data(self):
|
||
"""清理过期数据"""
|
||
try:
|
||
cutoff = (datetime.now() - timedelta(days=self.config.retention_days)).isoformat()
|
||
|
||
cursor = self._conn.cursor()
|
||
|
||
cursor.execute("SELECT image_path FROM alert_records WHERE created_at < ?", (cutoff,))
|
||
images = cursor.fetchall()
|
||
for (img_path,) in images:
|
||
if img_path and os.path.exists(img_path):
|
||
try:
|
||
os.remove(img_path)
|
||
except Exception:
|
||
pass
|
||
|
||
cursor.execute("DELETE FROM alert_records WHERE created_at < ?", (cutoff,))
|
||
deleted = cursor.rowcount
|
||
self._conn.commit()
|
||
|
||
logger.info(f"清理完成: 删除 {deleted} 条过期记录")
|
||
return deleted
|
||
|
||
except Exception as e:
|
||
logger.error(f"数据清理失败: {e}")
|
||
return 0
|
||
|
||
def get_statistics(self) -> Dict[str, Any]:
|
||
"""获取统计信息"""
|
||
cursor = self._conn.cursor()
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM alert_records")
|
||
total = cursor.fetchone()[0]
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE status = 'pending'")
|
||
pending = cursor.fetchone()[0]
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE created_at > ?",
|
||
((datetime.now() - timedelta(hours=24)).isoformat(),))
|
||
today = cursor.fetchone()[0]
|
||
|
||
db_size = os.path.getsize(self.config.db_path) / (1024 * 1024)
|
||
|
||
return {
|
||
"total_alerts": total,
|
||
"pending_alerts": pending,
|
||
"today_alerts": today,
|
||
"db_size_mb": round(db_size, 2),
|
||
"queue_size": self._write_queue.qsize(),
|
||
"retention_days": self.config.retention_days,
|
||
}
|
||
|
||
def close(self):
|
||
"""关闭数据库"""
|
||
self._running = False
|
||
|
||
if self._write_thread and self._write_thread.is_alive():
|
||
self._write_thread.join(timeout=10)
|
||
|
||
if self._conn:
|
||
self._conn.close()
|
||
|
||
logger.info("SQLite 数据库已关闭")
|
||
|
||
def save_camera_config(self, camera_id: str, rtsp_url: str, **kwargs) -> bool:
|
||
"""保存摄像头配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("""
|
||
INSERT OR REPLACE INTO camera_configs (
|
||
camera_id, rtsp_url, camera_name, status, enabled,
|
||
location, roi_group_id, extra_params, area_id, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
camera_id, rtsp_url,
|
||
kwargs.get('camera_name'),
|
||
kwargs.get('status', True),
|
||
kwargs.get('enabled', True),
|
||
kwargs.get('location'),
|
||
kwargs.get('roi_group_id'),
|
||
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
|
||
kwargs.get('area_id'),
|
||
now
|
||
))
|
||
self._conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存摄像头配置失败: {e}")
|
||
return False
|
||
|
||
def get_camera_config(self, camera_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取摄像头配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("SELECT * FROM camera_configs WHERE camera_id = ?", (camera_id,))
|
||
row = cursor.fetchone()
|
||
if row:
|
||
columns = [desc[0] for desc in cursor.description]
|
||
return dict(zip(columns, row))
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取摄像头配置失败: {e}")
|
||
return None
|
||
|
||
def get_all_camera_configs(self) -> List[Dict[str, Any]]:
|
||
"""获取所有摄像头配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("SELECT * FROM camera_configs ORDER BY camera_id")
|
||
columns = [desc[0] for desc in cursor.description]
|
||
return [dict(zip(columns, row)) for row in cursor.fetchall()]
|
||
except Exception as e:
|
||
logger.error(f"获取所有摄像头配置失败: {e}")
|
||
return []
|
||
|
||
def delete_camera_config(self, camera_id: str) -> bool:
|
||
"""删除摄像头配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("DELETE FROM camera_configs WHERE camera_id = ?", (camera_id,))
|
||
self._conn.commit()
|
||
return cursor.rowcount > 0
|
||
except Exception as e:
|
||
logger.error(f"删除摄像头配置失败: {e}")
|
||
return False
|
||
|
||
def save_roi_config(self, roi_id: str, camera_id: str, roi_type: str,
|
||
coordinates: List, **kwargs) -> bool:
|
||
"""保存ROI配置(空间信息,不包含业务参数)"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("""
|
||
INSERT OR REPLACE INTO roi_configs (
|
||
roi_id, camera_id, roi_type, coordinates,
|
||
enabled, priority, extra_params, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
roi_id, camera_id, roi_type, str(coordinates),
|
||
kwargs.get('enabled', True),
|
||
kwargs.get('priority', 0),
|
||
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
|
||
now
|
||
))
|
||
self._conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存ROI配置失败: {e}")
|
||
return False
|
||
|
||
def get_roi_config(self, roi_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取ROI配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("SELECT * FROM roi_configs WHERE roi_id = ?", (roi_id,))
|
||
row = cursor.fetchone()
|
||
if row:
|
||
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
|
||
'enabled', 'priority', 'extra_params', 'updated_at']
|
||
result = dict(zip(columns, row))
|
||
try:
|
||
result['coordinates'] = _normalize_coordinates(result['coordinates'])
|
||
except:
|
||
pass
|
||
return result
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取ROI配置失败: {e}")
|
||
return None
|
||
|
||
def get_rois_by_camera(self, camera_id: str) -> List[Dict[str, Any]]:
|
||
"""获取指定摄像头的所有ROI配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("SELECT * FROM roi_configs WHERE camera_id = ?", (camera_id,))
|
||
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
|
||
'enabled', 'priority', 'extra_params', 'updated_at']
|
||
results = []
|
||
for row in cursor.fetchall():
|
||
r = dict(zip(columns, row))
|
||
try:
|
||
r['coordinates'] = _normalize_coordinates(r['coordinates'])
|
||
except:
|
||
pass
|
||
results.append(r)
|
||
return results
|
||
except Exception as e:
|
||
logger.error(f"获取ROI配置失败: {e}")
|
||
return []
|
||
|
||
def get_all_roi_configs(self) -> List[Dict[str, Any]]:
|
||
"""获取所有ROI配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("SELECT * FROM roi_configs ORDER BY camera_id, roi_id")
|
||
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
|
||
'enabled', 'priority', 'extra_params', 'updated_at']
|
||
results = []
|
||
for row in cursor.fetchall():
|
||
r = dict(zip(columns, row))
|
||
try:
|
||
r['coordinates'] = _normalize_coordinates(r['coordinates'])
|
||
except:
|
||
pass
|
||
results.append(r)
|
||
return results
|
||
except Exception as e:
|
||
logger.error(f"获取所有ROI配置失败: {e}")
|
||
return []
|
||
|
||
def delete_roi_config(self, roi_id: str) -> bool:
|
||
"""删除ROI配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("DELETE FROM roi_configs WHERE roi_id = ?", (roi_id,))
|
||
self._conn.commit()
|
||
return cursor.rowcount > 0
|
||
except Exception as e:
|
||
logger.error(f"删除ROI配置失败: {e}")
|
||
return False
|
||
|
||
def save_algorithm(self, algo_code: str, algo_name: str, target_class: str = "person",
|
||
param_schema: Optional[str] = None, description: Optional[str] = None,
|
||
is_active: bool = True) -> bool:
|
||
"""保存算法配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("""
|
||
INSERT OR REPLACE INTO algorithm_registry (
|
||
algo_code, algo_name, target_class, param_schema, description,
|
||
is_active, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
algo_code, algo_name, target_class, param_schema, description,
|
||
is_active, now, now
|
||
))
|
||
self._conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存算法配置失败: {e}")
|
||
return False
|
||
|
||
def get_algorithm(self, algo_code: str) -> Optional[Dict[str, Any]]:
|
||
"""获取算法配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("SELECT * FROM algorithm_registry WHERE algo_code = ?", (algo_code,))
|
||
row = cursor.fetchone()
|
||
if row:
|
||
columns = ['algo_code', 'algo_name', 'target_class', 'param_schema',
|
||
'description', 'is_active', 'created_at', 'updated_at']
|
||
result = dict(zip(columns, row))
|
||
try:
|
||
if result.get('param_schema'):
|
||
result['param_schema'] = json.loads(result['param_schema'])
|
||
except:
|
||
pass
|
||
return result
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取算法配置失败: {e}")
|
||
return None
|
||
|
||
def get_all_algorithms(self, active_only: bool = True) -> List[Dict[str, Any]]:
|
||
"""获取所有算法配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
query = "SELECT * FROM algorithm_registry"
|
||
if active_only:
|
||
query += " WHERE is_active = 1"
|
||
cursor.execute(query)
|
||
columns = ['algo_code', 'algo_name', 'target_class', 'param_schema',
|
||
'description', 'is_active', 'created_at', 'updated_at']
|
||
results = []
|
||
for row in cursor.fetchall():
|
||
r = dict(zip(columns, row))
|
||
try:
|
||
if r.get('param_schema'):
|
||
r['param_schema'] = json.loads(r['param_schema'])
|
||
except:
|
||
pass
|
||
results.append(r)
|
||
return results
|
||
except Exception as e:
|
||
logger.error(f"获取所有算法配置失败: {e}")
|
||
return []
|
||
|
||
def save_roi_algo_bind(self, bind_id: str, roi_id: str, algo_code: str,
|
||
params: Dict[str, Any], priority: int = 0,
|
||
enabled: bool = True) -> bool:
|
||
"""保存ROI与算法的绑定关系"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("""
|
||
INSERT OR REPLACE INTO roi_algo_bind (
|
||
bind_id, roi_id, algo_code, params, priority,
|
||
enabled, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
bind_id, roi_id, algo_code, json.dumps(params),
|
||
priority, enabled, now, now
|
||
))
|
||
self._conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存ROI算法绑定失败: {e}")
|
||
return False
|
||
|
||
def get_roi_algo_bind(self, bind_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取ROI算法绑定配置"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("SELECT * FROM roi_algo_bind WHERE bind_id = ?", (bind_id,))
|
||
row = cursor.fetchone()
|
||
if row:
|
||
columns = ['bind_id', 'roi_id', 'algo_code', 'params',
|
||
'priority', 'enabled', 'created_at', 'updated_at']
|
||
result = dict(zip(columns, row))
|
||
try:
|
||
if result.get('params'):
|
||
result['params'] = json.loads(result['params'])
|
||
except:
|
||
pass
|
||
return result
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取ROI算法绑定失败: {e}")
|
||
return None
|
||
|
||
def get_bindings_by_roi(self, roi_id: str) -> List[Dict[str, Any]]:
|
||
"""获取指定ROI的所有算法绑定"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("""
|
||
SELECT b.*, a.algo_name, a.target_class
|
||
FROM roi_algo_bind b
|
||
LEFT JOIN algorithm_registry a ON b.algo_code = a.algo_code
|
||
WHERE b.roi_id = ? AND b.enabled = 1
|
||
ORDER BY b.priority DESC
|
||
""", (roi_id,))
|
||
results = []
|
||
for row in cursor.fetchall():
|
||
result = dict(zip(
|
||
['bind_id', 'roi_id', 'algo_code', 'params', 'priority',
|
||
'enabled', 'created_at', 'updated_at', 'algo_name', 'target_class'],
|
||
row
|
||
))
|
||
try:
|
||
if result.get('params'):
|
||
result['params'] = json.loads(result['params'])
|
||
except:
|
||
pass
|
||
results.append(result)
|
||
return results
|
||
except Exception as e:
|
||
logger.error(f"获取ROI算法绑定失败: {e}")
|
||
return []
|
||
|
||
def get_bindings_by_camera(self, camera_id: str) -> List[Dict[str, Any]]:
|
||
"""获取指定摄像头的所有ROI算法绑定"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("""
|
||
SELECT b.*, a.algo_name, a.target_class, r.roi_type, r.coordinates
|
||
FROM roi_algo_bind b
|
||
LEFT JOIN algorithm_registry a ON b.algo_code = a.algo_code
|
||
LEFT JOIN roi_configs r ON b.roi_id = r.roi_id
|
||
WHERE r.camera_id = ? AND b.enabled = 1 AND r.enabled = 1
|
||
ORDER BY r.priority DESC, b.priority DESC
|
||
""", (camera_id,))
|
||
results = []
|
||
for row in cursor.fetchall():
|
||
result = dict(zip(
|
||
['bind_id', 'roi_id', 'algo_code', 'params', 'priority',
|
||
'enabled', 'created_at', 'updated_at', 'algo_name', 'target_class',
|
||
'roi_type', 'coordinates'],
|
||
row
|
||
))
|
||
try:
|
||
if result.get('params'):
|
||
result['params'] = json.loads(result['params'])
|
||
if result.get('coordinates'):
|
||
result['coordinates'] = _normalize_coordinates(result['coordinates'])
|
||
except:
|
||
pass
|
||
results.append(result)
|
||
return results
|
||
except Exception as e:
|
||
logger.error(f"获取摄像头算法绑定失败: {e}")
|
||
return []
|
||
|
||
def delete_roi_algo_bind(self, bind_id: str) -> bool:
|
||
"""删除ROI算法绑定"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("DELETE FROM roi_algo_bind WHERE bind_id = ?", (bind_id,))
|
||
self._conn.commit()
|
||
return cursor.rowcount > 0
|
||
except Exception as e:
|
||
logger.error(f"删除ROI算法绑定失败: {e}")
|
||
return False
|
||
|
||
def delete_bindings_by_roi(self, roi_id: str) -> int:
|
||
"""删除指定ROI的所有算法绑定"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("DELETE FROM roi_algo_bind WHERE roi_id = ?", (roi_id,))
|
||
self._conn.commit()
|
||
return cursor.rowcount
|
||
except Exception as e:
|
||
logger.error(f"删除ROI算法绑定失败: {e}")
|
||
return 0
|
||
|
||
def get_all_bind_ids(self) -> List[str]:
|
||
"""获取所有算法绑定的 bind_id 列表(用于清理孤立绑定)"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
cursor.execute("SELECT bind_id FROM roi_algo_bind")
|
||
return [row[0] for row in cursor.fetchall()]
|
||
except Exception as e:
|
||
logger.error(f"获取所有绑定ID失败: {e}")
|
||
return []
|
||
|
||
def log_config_update(
|
||
self,
|
||
config_type: str,
|
||
config_id: Optional[str],
|
||
old_value: Any,
|
||
new_value: Any,
|
||
updated_by: str = "system"
|
||
):
|
||
"""记录配置更新日志"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("""
|
||
INSERT INTO config_update_log (
|
||
config_type, config_id, old_value, new_value, updated_by, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
config_type,
|
||
config_id,
|
||
str(old_value) if old_value else None,
|
||
str(new_value) if new_value else None,
|
||
updated_by,
|
||
now
|
||
))
|
||
self._conn.commit()
|
||
logger.info(f"配置更新日志已记录: {config_type}/{config_id}")
|
||
except Exception as e:
|
||
logger.error(f"记录配置更新日志失败: {e}")
|
||
|
||
def get_config_update_log(
|
||
self,
|
||
config_type: Optional[str] = None,
|
||
limit: int = 100
|
||
) -> List[Dict[str, Any]]:
|
||
"""获取配置更新日志"""
|
||
try:
|
||
cursor = self._conn.cursor()
|
||
query = "SELECT * FROM config_update_log WHERE 1=1"
|
||
params = []
|
||
if config_type:
|
||
query += " AND config_type = ?"
|
||
params.append(config_type)
|
||
query += " ORDER BY id DESC LIMIT ?"
|
||
params.append(limit)
|
||
cursor.execute(query, params)
|
||
columns = ['id', 'config_type', 'config_id', 'old_value', 'new_value',
|
||
'updated_by', 'updated_at']
|
||
return [dict(zip(columns, row)) for row in cursor.fetchall()]
|
||
except Exception as e:
|
||
logger.error(f"获取配置更新日志失败: {e}")
|
||
return []
|
||
|
||
|
||
def get_sqlite_manager() -> SQLiteManager:
|
||
"""获取 SQLite 管理器单例"""
|
||
return SQLiteManager()
|