Files
security-ai-edge/config/database.py
16337 b6fba4639d fix(aiot): 修复离岗检测启动立即报警的三个Bug
Bug#1(严重): 无人帧不调用算法
- _batch_process_rois 中 len(boxes)>0 才调用 _handle_detections
- 导致离岗检测永远收不到"人走了"的信号
- 修复: 无论检测结果是否为空都调用算法
- 同时移除 _handle_detections 中 tracks 为空的 early return

Bug#2(高): WAITING 一帧就跳 ON_DUTY
- 检测到人第一帧就立即从 WAITING 跳到 ON_DUTY
- confirm_on_duty_sec 参数完全未被使用
- 修复: 新增 CONFIRMING 状态,需连续 10s 检测到人才确认上岗

Bug#3(中): confirm_leave_sec 默认值过短
- 默认 10 秒,用户预期 30 秒
- 修复: 所有默认值统一改为 30s

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 10:01:20 +08:00

959 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SQLite 数据库模块
边缘AI推理服务的本地数据存储
特性:
- WAL 模式Write-Ahead Logging提升写入性能
- 异步写入策略
- 滚动清理机制保留7天数据
"""
import os
import json
import sqlite3
import threading
import queue
import time
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Generator
def _normalize_coordinates(coords):
"""将坐标统一为 [[x,y],...] 格式,兼容 [{'x':..,'y':..},...] 格式"""
if isinstance(coords, str):
try:
coords = eval(coords)
except:
return coords
if isinstance(coords, list) and coords and isinstance(coords[0], dict):
return [[p.get("x", 0), p.get("y", 0)] for p in coords]
return coords
from dataclasses import dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class StorageConfig:
"""存储配置类"""
db_path: str = "./data/security_events.db"
image_dir: str = "./data/captures"
retention_days: int = 7
wal_mode: bool = True
batch_size: int = 100
flush_interval: float = 5.0
@dataclass
class AlertRecord:
"""告警记录"""
alert_id: str
camera_id: str
roi_id: str
bind_id: Optional[str] = None # 关联 roi_algo_bind 表
alert_type: str = "detection"
target_class: Optional[str] = None
confidence: Optional[float] = None
bbox: Optional[List[float]] = None
message: Optional[str] = None
image_path: Optional[str] = None
status: str = "pending"
created_at: datetime = field(default_factory=datetime.now)
processed_at: Optional[datetime] = None
duration_minutes: Optional[float] = None
detections: Optional[str] = None # JSON格式的检测结果
class SQLiteManager:
"""SQLite 数据库管理器"""
_instance = None
_lock = threading.Lock()
def __new__(cls, config: Optional[StorageConfig] = None):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, config: Optional[StorageConfig] = None):
if self._initialized:
return
if config is None:
config = StorageConfig()
self.config = config
self._conn: Optional[sqlite3.Connection] = None
self._write_queue: queue.Queue = queue.Queue()
self._running = False
self._write_thread: Optional[threading.Thread] = None
self._cleanup_thread: Optional[threading.Thread] = None
self._init_directories()
self._init_database()
self._start_background_threads()
self._initialized = True
logger.info(f"SQLite 数据库初始化成功: {config.db_path}")
def _init_directories(self):
"""初始化目录"""
Path(self.config.db_path).parent.mkdir(parents=True, exist_ok=True)
Path(self.config.image_dir).mkdir(parents=True, exist_ok=True)
def _init_database(self):
"""初始化数据库表"""
self._conn = sqlite3.connect(
self.config.db_path,
check_same_thread=False,
timeout=30.0
)
if self.config.wal_mode:
cursor = self._conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA cache_size=-64000;")
self._conn.commit()
cursor = self._conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS algorithm_registry (
algo_code TEXT PRIMARY KEY,
algo_name TEXT NOT NULL,
target_class TEXT DEFAULT 'person',
param_schema TEXT,
description TEXT,
is_active BOOLEAN DEFAULT 1,
created_at TEXT,
updated_at TEXT
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_algo_active
ON algorithm_registry(is_active)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS roi_configs (
roi_id TEXT PRIMARY KEY,
camera_id TEXT NOT NULL,
roi_type TEXT NOT NULL,
coordinates TEXT NOT NULL,
enabled BOOLEAN DEFAULT 1,
priority INTEGER DEFAULT 0,
extra_params TEXT,
updated_at TEXT
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_roi_camera
ON roi_configs(camera_id)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS roi_algo_bind (
bind_id TEXT PRIMARY KEY,
roi_id TEXT NOT NULL,
algo_code TEXT NOT NULL,
params TEXT NOT NULL,
priority INTEGER DEFAULT 0,
enabled BOOLEAN DEFAULT 1,
created_at TEXT,
updated_at TEXT,
FOREIGN KEY (roi_id) REFERENCES roi_configs(roi_id) ON DELETE CASCADE,
FOREIGN KEY (algo_code) REFERENCES algorithm_registry(algo_code) ON DELETE RESTRICT
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_bind_roi
ON roi_algo_bind(roi_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_bind_algo
ON roi_algo_bind(algo_code)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS alert_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
alert_id TEXT UNIQUE NOT NULL,
camera_id TEXT NOT NULL,
roi_id TEXT NOT NULL,
bind_id TEXT,
alert_type TEXT NOT NULL,
target_class TEXT,
confidence REAL,
bbox TEXT,
message TEXT,
image_path TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
processed_at TEXT,
duration_minutes REAL,
detections TEXT,
FOREIGN KEY (bind_id) REFERENCES roi_algo_bind(bind_id) ON DELETE SET NULL
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_camera
ON alert_records(camera_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_created
ON alert_records(created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_status
ON alert_records(status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alert_bind
ON alert_records(bind_id)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS camera_configs (
camera_id TEXT PRIMARY KEY,
rtsp_url TEXT NOT NULL,
camera_name TEXT,
status BOOLEAN DEFAULT 1,
enabled BOOLEAN DEFAULT 1,
location TEXT,
roi_group_id TEXT,
extra_params TEXT,
updated_at TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS config_update_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
config_type TEXT NOT NULL,
config_id TEXT,
old_value TEXT,
new_value TEXT,
updated_by TEXT,
updated_at TEXT
)
""")
self._conn.commit()
self._init_default_algorithms()
def _init_default_algorithms(self):
"""初始化默认算法配置"""
try:
cursor = self._conn.cursor()
algorithms = [
{
'algo_code': 'leave_post',
'algo_name': '离岗检测',
'target_class': 'person',
'param_schema': json.dumps({
"confirm_on_duty_sec": {"type": "int", "default": 10, "min": 1},
"confirm_leave_sec": {"type": "int", "default": 30, "min": 1},
"cooldown_sec": {"type": "int", "default": 600, "min": 0},
"working_hours": {"type": "list", "default": []},
}),
'description': '检测人员是否在岗,支持工作时间段配置'
},
{
'algo_code': 'intrusion',
'algo_name': '周界入侵检测',
'target_class': 'person',
'param_schema': json.dumps({
"cooldown_seconds": {"type": "int", "default": 300, "min": 0},
"confirm_seconds": {"type": "int", "default": 5, "min": 1},
}),
'description': '检测人员进入指定区域,支持确认时间和冷却时间配置'
},
{
'algo_code': 'crowd_detection',
'algo_name': '人群聚集检测',
'target_class': 'person',
'param_schema': json.dumps({
"max_count": {"type": "int", "default": 10, "min": 1},
"cooldown_seconds": {"type": "int", "default": 300, "min": 0},
}),
'description': '检测区域内人员数量是否超过阈值'
},
]
for algo in algorithms:
cursor.execute("""
INSERT OR IGNORE INTO algorithm_registry (
algo_code, algo_name, target_class, param_schema, description,
is_active, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, 1, ?, ?)
""", (
algo['algo_code'], algo['algo_name'], algo['target_class'],
algo['param_schema'], algo['description'],
datetime.now().isoformat(), datetime.now().isoformat()
))
self._conn.commit()
logger.info(f"已初始化 {len(algorithms)} 个默认算法配置")
except Exception as e:
logger.error(f"初始化默认算法失败: {e}")
def _start_background_threads(self):
"""启动后台线程"""
self._running = True
self._write_thread = threading.Thread(
target=self._write_worker,
name="SQLiteWrite",
daemon=True
)
self._write_thread.start()
self._cleanup_thread = threading.Thread(
target=self._cleanup_worker,
name="SQLiteCleanup",
daemon=True
)
self._cleanup_thread.start()
def _write_worker(self):
"""异步写入工作线程"""
batch = []
last_flush = time.time()
while self._running:
try:
try:
item = self._write_queue.get(timeout=1.0)
batch.append(item)
except queue.Empty:
pass
should_flush = (
len(batch) >= self.config.batch_size or
time.time() - last_flush >= self.config.flush_interval
)
if batch and (should_flush or len(batch) >= 1000):
self._flush_batch(batch)
batch.clear()
last_flush = time.time()
except Exception as e:
logger.error(f"SQLite 写入异常: {e}")
if batch:
self._flush_batch(batch)
def _flush_batch(self, batch: List[Dict[str, Any]]):
"""批量写入数据库"""
try:
cursor = self._conn.cursor()
for record in batch:
cursor.execute("""
INSERT OR REPLACE INTO alert_records (
alert_id, camera_id, roi_id, bind_id, alert_type,
target_class, confidence, bbox, message,
image_path, status, created_at, processed_at,
duration_minutes, detections
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record['alert_id'],
record['camera_id'],
record['roi_id'],
record.get('bind_id'),
record['alert_type'],
record.get('target_class'),
record.get('confidence'),
record.get('bbox'),
record.get('message'),
record.get('image_path'),
record.get('status', 'pending'),
record['created_at'],
record.get('processed_at'),
record.get('duration_minutes'),
record.get('detections'),
))
self._conn.commit()
logger.debug(f"批量写入 {len(batch)} 条记录")
except Exception as e:
logger.error(f"批量写入失败: {e}")
def _cleanup_worker(self):
"""清理工作线程(每天执行一次)"""
while self._running:
try:
time.sleep(3600)
if self._running:
self.cleanup_old_data()
except Exception as e:
logger.error(f"数据清理异常: {e}")
def queue_alert(self, alert: AlertRecord):
"""将告警加入写入队列"""
record = {
'alert_id': alert.alert_id,
'camera_id': alert.camera_id,
'roi_id': alert.roi_id,
'bind_id': alert.bind_id,
'alert_type': alert.alert_type,
'target_class': alert.target_class,
'confidence': alert.confidence,
'bbox': str(alert.bbox) if alert.bbox else None,
'message': alert.message,
'image_path': alert.image_path,
'status': alert.status,
'created_at': alert.created_at.isoformat(),
'processed_at': alert.processed_at.isoformat() if alert.processed_at else None,
'duration_minutes': alert.duration_minutes,
'detections': alert.detections,
}
self._write_queue.put(record)
def get_alerts(
self,
camera_id: Optional[str] = None,
status: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""查询告警记录"""
cursor = self._conn.cursor()
query = "SELECT * FROM alert_records WHERE 1=1"
params = []
if camera_id:
query += " AND camera_id = ?"
params.append(camera_id)
if status:
query += " AND status = ?"
params.append(status)
if start_time:
query += " AND created_at >= ?"
params.append(start_time.isoformat())
if end_time:
query += " AND created_at <= ?"
params.append(end_time.isoformat())
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
columns = ['id', 'alert_id', 'camera_id', 'roi_id', 'alert_type',
'target_class', 'confidence', 'bbox', 'message', 'image_path',
'status', 'created_at', 'processed_at']
return [dict(zip(columns, row)) for row in rows]
def update_status(self, alert_id: str, status: str) -> bool:
"""更新告警状态"""
try:
cursor = self._conn.cursor()
cursor.execute("""
UPDATE alert_records
SET status = ?, processed_at = ?
WHERE alert_id = ?
""", (status, datetime.now().isoformat(), alert_id))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"更新状态失败: {e}")
return False
def cleanup_old_data(self):
"""清理过期数据"""
try:
cutoff = (datetime.now() - timedelta(days=self.config.retention_days)).isoformat()
cursor = self._conn.cursor()
cursor.execute("SELECT image_path FROM alert_records WHERE created_at < ?", (cutoff,))
images = cursor.fetchall()
for (img_path,) in images:
if img_path and os.path.exists(img_path):
try:
os.remove(img_path)
except Exception:
pass
cursor.execute("DELETE FROM alert_records WHERE created_at < ?", (cutoff,))
deleted = cursor.rowcount
self._conn.commit()
logger.info(f"清理完成: 删除 {deleted} 条过期记录")
return deleted
except Exception as e:
logger.error(f"数据清理失败: {e}")
return 0
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
cursor = self._conn.cursor()
cursor.execute("SELECT COUNT(*) FROM alert_records")
total = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE status = 'pending'")
pending = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM alert_records WHERE created_at > ?",
((datetime.now() - timedelta(hours=24)).isoformat(),))
today = cursor.fetchone()[0]
db_size = os.path.getsize(self.config.db_path) / (1024 * 1024)
return {
"total_alerts": total,
"pending_alerts": pending,
"today_alerts": today,
"db_size_mb": round(db_size, 2),
"queue_size": self._write_queue.qsize(),
"retention_days": self.config.retention_days,
}
def close(self):
"""关闭数据库"""
self._running = False
if self._write_thread and self._write_thread.is_alive():
self._write_thread.join(timeout=10)
if self._conn:
self._conn.close()
logger.info("SQLite 数据库已关闭")
def save_camera_config(self, camera_id: str, rtsp_url: str, **kwargs) -> bool:
"""保存摄像头配置"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO camera_configs (
camera_id, rtsp_url, camera_name, status, enabled,
location, roi_group_id, extra_params, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
camera_id, rtsp_url,
kwargs.get('camera_name'),
kwargs.get('status', True),
kwargs.get('enabled', True),
kwargs.get('location'),
kwargs.get('roi_group_id'),
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
now
))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存摄像头配置失败: {e}")
return False
def get_camera_config(self, camera_id: str) -> Optional[Dict[str, Any]]:
"""获取摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM camera_configs WHERE camera_id = ?", (camera_id,))
row = cursor.fetchone()
if row:
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at']
return dict(zip(columns, row))
return None
except Exception as e:
logger.error(f"获取摄像头配置失败: {e}")
return None
def get_all_camera_configs(self) -> List[Dict[str, Any]]:
"""获取所有摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM camera_configs ORDER BY camera_id")
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at']
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取所有摄像头配置失败: {e}")
return []
def delete_camera_config(self, camera_id: str) -> bool:
"""删除摄像头配置"""
try:
cursor = self._conn.cursor()
cursor.execute("DELETE FROM camera_configs WHERE camera_id = ?", (camera_id,))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除摄像头配置失败: {e}")
return False
def save_roi_config(self, roi_id: str, camera_id: str, roi_type: str,
coordinates: List, **kwargs) -> bool:
"""保存ROI配置空间信息不包含业务参数"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO roi_configs (
roi_id, camera_id, roi_type, coordinates,
enabled, priority, extra_params, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
roi_id, camera_id, roi_type, str(coordinates),
kwargs.get('enabled', True),
kwargs.get('priority', 0),
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
now
))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存ROI配置失败: {e}")
return False
def get_roi_config(self, roi_id: str) -> Optional[Dict[str, Any]]:
"""获取ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs WHERE roi_id = ?", (roi_id,))
row = cursor.fetchone()
if row:
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'enabled', 'priority', 'extra_params', 'updated_at']
result = dict(zip(columns, row))
try:
result['coordinates'] = _normalize_coordinates(result['coordinates'])
except:
pass
return result
return None
except Exception as e:
logger.error(f"获取ROI配置失败: {e}")
return None
def get_rois_by_camera(self, camera_id: str) -> List[Dict[str, Any]]:
"""获取指定摄像头的所有ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs WHERE camera_id = ?", (camera_id,))
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'enabled', 'priority', 'extra_params', 'updated_at']
results = []
for row in cursor.fetchall():
r = dict(zip(columns, row))
try:
r['coordinates'] = _normalize_coordinates(r['coordinates'])
except:
pass
results.append(r)
return results
except Exception as e:
logger.error(f"获取ROI配置失败: {e}")
return []
def get_all_roi_configs(self) -> List[Dict[str, Any]]:
"""获取所有ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_configs ORDER BY camera_id, roi_id")
columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates',
'enabled', 'priority', 'extra_params', 'updated_at']
results = []
for row in cursor.fetchall():
r = dict(zip(columns, row))
try:
r['coordinates'] = _normalize_coordinates(r['coordinates'])
except:
pass
results.append(r)
return results
except Exception as e:
logger.error(f"获取所有ROI配置失败: {e}")
return []
def delete_roi_config(self, roi_id: str) -> bool:
"""删除ROI配置"""
try:
cursor = self._conn.cursor()
cursor.execute("DELETE FROM roi_configs WHERE roi_id = ?", (roi_id,))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除ROI配置失败: {e}")
return False
def save_algorithm(self, algo_code: str, algo_name: str, target_class: str = "person",
param_schema: Optional[str] = None, description: Optional[str] = None,
is_active: bool = True) -> bool:
"""保存算法配置"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO algorithm_registry (
algo_code, algo_name, target_class, param_schema, description,
is_active, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
algo_code, algo_name, target_class, param_schema, description,
is_active, now, now
))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存算法配置失败: {e}")
return False
def get_algorithm(self, algo_code: str) -> Optional[Dict[str, Any]]:
"""获取算法配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM algorithm_registry WHERE algo_code = ?", (algo_code,))
row = cursor.fetchone()
if row:
columns = ['algo_code', 'algo_name', 'target_class', 'param_schema',
'description', 'is_active', 'created_at', 'updated_at']
result = dict(zip(columns, row))
try:
if result.get('param_schema'):
result['param_schema'] = json.loads(result['param_schema'])
except:
pass
return result
return None
except Exception as e:
logger.error(f"获取算法配置失败: {e}")
return None
def get_all_algorithms(self, active_only: bool = True) -> List[Dict[str, Any]]:
"""获取所有算法配置"""
try:
cursor = self._conn.cursor()
query = "SELECT * FROM algorithm_registry"
if active_only:
query += " WHERE is_active = 1"
cursor.execute(query)
columns = ['algo_code', 'algo_name', 'target_class', 'param_schema',
'description', 'is_active', 'created_at', 'updated_at']
results = []
for row in cursor.fetchall():
r = dict(zip(columns, row))
try:
if r.get('param_schema'):
r['param_schema'] = json.loads(r['param_schema'])
except:
pass
results.append(r)
return results
except Exception as e:
logger.error(f"获取所有算法配置失败: {e}")
return []
def save_roi_algo_bind(self, bind_id: str, roi_id: str, algo_code: str,
params: Dict[str, Any], priority: int = 0,
enabled: bool = True) -> bool:
"""保存ROI与算法的绑定关系"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO roi_algo_bind (
bind_id, roi_id, algo_code, params, priority,
enabled, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
bind_id, roi_id, algo_code, json.dumps(params),
priority, enabled, now, now
))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存ROI算法绑定失败: {e}")
return False
def get_roi_algo_bind(self, bind_id: str) -> Optional[Dict[str, Any]]:
"""获取ROI算法绑定配置"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM roi_algo_bind WHERE bind_id = ?", (bind_id,))
row = cursor.fetchone()
if row:
columns = ['bind_id', 'roi_id', 'algo_code', 'params',
'priority', 'enabled', 'created_at', 'updated_at']
result = dict(zip(columns, row))
try:
if result.get('params'):
result['params'] = json.loads(result['params'])
except:
pass
return result
return None
except Exception as e:
logger.error(f"获取ROI算法绑定失败: {e}")
return None
def get_bindings_by_roi(self, roi_id: str) -> List[Dict[str, Any]]:
"""获取指定ROI的所有算法绑定"""
try:
cursor = self._conn.cursor()
cursor.execute("""
SELECT b.*, a.algo_name, a.target_class
FROM roi_algo_bind b
LEFT JOIN algorithm_registry a ON b.algo_code = a.algo_code
WHERE b.roi_id = ? AND b.enabled = 1
ORDER BY b.priority DESC
""", (roi_id,))
results = []
for row in cursor.fetchall():
result = dict(zip(
['bind_id', 'roi_id', 'algo_code', 'params', 'priority',
'enabled', 'created_at', 'updated_at', 'algo_name', 'target_class'],
row
))
try:
if result.get('params'):
result['params'] = json.loads(result['params'])
except:
pass
results.append(result)
return results
except Exception as e:
logger.error(f"获取ROI算法绑定失败: {e}")
return []
def get_bindings_by_camera(self, camera_id: str) -> List[Dict[str, Any]]:
"""获取指定摄像头的所有ROI算法绑定"""
try:
cursor = self._conn.cursor()
cursor.execute("""
SELECT b.*, a.algo_name, a.target_class, r.roi_type, r.coordinates
FROM roi_algo_bind b
LEFT JOIN algorithm_registry a ON b.algo_code = a.algo_code
LEFT JOIN roi_configs r ON b.roi_id = r.roi_id
WHERE r.camera_id = ? AND b.enabled = 1 AND r.enabled = 1
ORDER BY r.priority DESC, b.priority DESC
""", (camera_id,))
results = []
for row in cursor.fetchall():
result = dict(zip(
['bind_id', 'roi_id', 'algo_code', 'params', 'priority',
'enabled', 'created_at', 'updated_at', 'algo_name', 'target_class',
'roi_type', 'coordinates'],
row
))
try:
if result.get('params'):
result['params'] = json.loads(result['params'])
if result.get('coordinates'):
result['coordinates'] = _normalize_coordinates(result['coordinates'])
except:
pass
results.append(result)
return results
except Exception as e:
logger.error(f"获取摄像头算法绑定失败: {e}")
return []
def delete_roi_algo_bind(self, bind_id: str) -> bool:
"""删除ROI算法绑定"""
try:
cursor = self._conn.cursor()
cursor.execute("DELETE FROM roi_algo_bind WHERE bind_id = ?", (bind_id,))
self._conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除ROI算法绑定失败: {e}")
return False
def delete_bindings_by_roi(self, roi_id: str) -> int:
"""删除指定ROI的所有算法绑定"""
try:
cursor = self._conn.cursor()
cursor.execute("DELETE FROM roi_algo_bind WHERE roi_id = ?", (roi_id,))
self._conn.commit()
return cursor.rowcount
except Exception as e:
logger.error(f"删除ROI算法绑定失败: {e}")
return 0
def log_config_update(
self,
config_type: str,
config_id: Optional[str],
old_value: Any,
new_value: Any,
updated_by: str = "system"
):
"""记录配置更新日志"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT INTO config_update_log (
config_type, config_id, old_value, new_value, updated_by, updated_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (
config_type,
config_id,
str(old_value) if old_value else None,
str(new_value) if new_value else None,
updated_by,
now
))
self._conn.commit()
logger.info(f"配置更新日志已记录: {config_type}/{config_id}")
except Exception as e:
logger.error(f"记录配置更新日志失败: {e}")
def get_config_update_log(
self,
config_type: Optional[str] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""获取配置更新日志"""
try:
cursor = self._conn.cursor()
query = "SELECT * FROM config_update_log WHERE 1=1"
params = []
if config_type:
query += " AND config_type = ?"
params.append(config_type)
query += " ORDER BY id DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
columns = ['id', 'config_type', 'config_id', 'old_value', 'new_value',
'updated_by', 'updated_at']
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取配置更新日志失败: {e}")
return []
def get_sqlite_manager() -> SQLiteManager:
"""获取 SQLite 管理器单例"""
return SQLiteManager()