2026-01-29 18:33:12 +08:00
|
|
|
|
"""
|
2026-02-10 15:21:45 +08:00
|
|
|
|
配置同步模块 - 双 Redis 三层权威模型
|
|
|
|
|
|
|
|
|
|
|
|
架构:
|
|
|
|
|
|
MySQL(云端权威源) → 云端 Redis(分发 + 版本控制) → 边缘 Redis(本地缓存 + 离线运行)
|
2026-01-30 11:46:15 +08:00
|
|
|
|
|
|
|
|
|
|
存储策略:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
- 云端 Redis: 配置分发中枢,通过 Stream 推送变更事件
|
|
|
|
|
|
- 本地 Redis: 边缘自治核心,算法进程只读本地 Redis
|
|
|
|
|
|
- SQLite: 本地持久化存储(摄像头、ROI 配置)
|
2026-01-30 11:46:15 +08:00
|
|
|
|
- 内存缓存: 热点配置快速访问
|
2026-01-29 18:33:12 +08:00
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
import logging
|
2026-02-11 09:57:02 +08:00
|
|
|
|
import os
|
2026-01-29 18:33:12 +08:00
|
|
|
|
import threading
|
|
|
|
|
|
import time
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from typing import Any, Callable, Dict, List, Optional, Set
|
|
|
|
|
|
|
|
|
|
|
|
import redis
|
|
|
|
|
|
from redis import Redis
|
|
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
from config.settings import get_settings
|
2026-01-30 11:46:15 +08:00
|
|
|
|
from config.database import get_sqlite_manager, SQLiteManager
|
2026-02-10 15:21:45 +08:00
|
|
|
|
from config.config_models import CameraInfo as CameraInfoModel, ROIInfo, ROIInfoNew, ROIAlgoBind
|
2026-01-29 18:33:12 +08:00
|
|
|
|
from utils.version_control import get_version_control
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
# ==================== Redis Key 常量 ====================
|
|
|
|
|
|
|
|
|
|
|
|
# 云端 Redis Keys
|
|
|
|
|
|
CLOUD_DEVICE_CONFIG_KEY = "device:{device_id}:config" # 设备最新配置 JSON
|
|
|
|
|
|
CLOUD_DEVICE_VERSION_KEY = "device:{device_id}:version" # 配置版本号
|
|
|
|
|
|
CLOUD_CONFIG_STREAM = "device_config_stream" # 配置变更 Stream
|
|
|
|
|
|
|
|
|
|
|
|
# 本地 Redis Keys
|
|
|
|
|
|
LOCAL_CONFIG_CURRENT = "local:device:config:current" # 当前生效配置
|
|
|
|
|
|
LOCAL_CONFIG_BACKUP = "local:device:config:backup" # 上一次成功配置(回滚用)
|
|
|
|
|
|
LOCAL_CONFIG_VERSION = "local:device:config:version" # 当前版本号
|
|
|
|
|
|
LOCAL_STREAM_LAST_ID = "local:device:config:stream_last_id" # 上次消费的 Stream ID
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
class ConfigCache:
|
|
|
|
|
|
"""配置缓存管理类"""
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def __init__(self, max_size: int = 1000, ttl: int = 300):
|
|
|
|
|
|
self._cache: Dict[str, Any] = {}
|
|
|
|
|
|
self._access_times: Dict[str, float] = {}
|
|
|
|
|
|
self._max_size = max_size
|
|
|
|
|
|
self._ttl = ttl
|
|
|
|
|
|
self._lock = threading.RLock()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def get(self, key: str) -> Optional[Any]:
|
|
|
|
|
|
with self._lock:
|
|
|
|
|
|
if key in self._cache:
|
|
|
|
|
|
access_time = self._access_times.get(key, 0)
|
|
|
|
|
|
if (time.time() - access_time) < self._ttl:
|
|
|
|
|
|
self._access_times[key] = time.time()
|
|
|
|
|
|
return self._cache[key]
|
|
|
|
|
|
else:
|
|
|
|
|
|
self._delete(key)
|
|
|
|
|
|
return None
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def set(self, key: str, value: Any):
|
|
|
|
|
|
with self._lock:
|
|
|
|
|
|
if len(self._cache) >= self._max_size:
|
|
|
|
|
|
self._evict_lru()
|
|
|
|
|
|
self._cache[key] = value
|
|
|
|
|
|
self._access_times[key] = time.time()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def delete(self, key: str):
|
|
|
|
|
|
with self._lock:
|
|
|
|
|
|
self._delete(key)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def _delete(self, key: str):
|
|
|
|
|
|
self._cache.pop(key, None)
|
|
|
|
|
|
self._access_times.pop(key, None)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def _evict_lru(self):
|
|
|
|
|
|
if not self._access_times:
|
|
|
|
|
|
return
|
|
|
|
|
|
min_access_time = min(self._access_times.values())
|
|
|
|
|
|
lru_keys = [k for k, v in self._access_times.items() if v == min_access_time]
|
|
|
|
|
|
for key in lru_keys[:10]:
|
|
|
|
|
|
self._delete(key)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def clear(self):
|
|
|
|
|
|
with self._lock:
|
|
|
|
|
|
self._cache.clear()
|
|
|
|
|
|
self._access_times.clear()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
|
|
|
|
with self._lock:
|
|
|
|
|
|
return {
|
|
|
|
|
|
"size": len(self._cache),
|
|
|
|
|
|
"max_size": self._max_size,
|
|
|
|
|
|
"ttl": self._ttl,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ConfigSyncManager:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"""配置同步管理器 - 双 Redis + Stream 模式
|
|
|
|
|
|
|
|
|
|
|
|
启动流程:
|
|
|
|
|
|
1. 连接本地 Redis,加载上次的配置(保证离线可用)
|
|
|
|
|
|
2. 连接云端 Redis,拉取最新配置并比对版本
|
|
|
|
|
|
3. 启动 Stream 监听线程,持续接收增量变更
|
|
|
|
|
|
|
|
|
|
|
|
配置变更流程:
|
|
|
|
|
|
1. 收到 Stream 事件(device_id, version, action)
|
|
|
|
|
|
2. 从云端 Redis 拉取完整配置
|
|
|
|
|
|
3. 校验 version > 本地 version
|
|
|
|
|
|
4. 备份当前配置 → 应用新配置 → 同步到 SQLite
|
|
|
|
|
|
5. 触发回调通知算法模块热更新
|
|
|
|
|
|
6. 失败时回滚到 backup 配置
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
_instance = None
|
|
|
|
|
|
_lock = threading.Lock()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def __new__(cls):
|
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
|
with cls._lock:
|
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
|
cls._instance = super().__new__(cls)
|
|
|
|
|
|
cls._instance._initialized = False
|
|
|
|
|
|
return cls._instance
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def __init__(self):
|
|
|
|
|
|
if self._initialized:
|
|
|
|
|
|
return
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
settings = get_settings()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
self._device_id = settings.mqtt.device_id # 边缘节点 ID
|
2026-01-29 18:33:12 +08:00
|
|
|
|
self._config_version = settings.config_version
|
2026-02-11 09:57:02 +08:00
|
|
|
|
self._sync_mode = settings.config_sync_mode
|
2026-01-29 18:33:12 +08:00
|
|
|
|
self._cache = ConfigCache()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
self._db_manager = None
|
|
|
|
|
|
|
|
|
|
|
|
# 双 Redis 客户端
|
|
|
|
|
|
self._cloud_redis: Optional[Redis] = None
|
|
|
|
|
|
self._local_redis: Optional[Redis] = None
|
|
|
|
|
|
|
|
|
|
|
|
# Stream 监听
|
|
|
|
|
|
self._stream_thread: Optional[threading.Thread] = None
|
2026-01-29 18:33:12 +08:00
|
|
|
|
self._stop_event = threading.Event()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
self._stream_last_id = "0" # 从头开始消费
|
|
|
|
|
|
|
|
|
|
|
|
# 配置变更回调
|
2026-01-29 18:33:12 +08:00
|
|
|
|
self._callbacks: Dict[str, Set[Callable]] = {}
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
self._version_control = get_version_control()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
self._initialized = True
|
|
|
|
|
|
|
2026-02-11 09:57:02 +08:00
|
|
|
|
if self._sync_mode == "REDIS":
|
|
|
|
|
|
self._init_local_redis()
|
|
|
|
|
|
self._init_cloud_redis()
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.info("CONFIG_SYNC_MODE=LOCAL: 跳过 Redis 初始化,仅使用本地 SQLite")
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
|
|
|
|
|
# ==================== Redis 初始化 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def _init_local_redis(self):
|
|
|
|
|
|
"""初始化本地 Redis 连接"""
|
2026-01-29 18:33:12 +08:00
|
|
|
|
try:
|
|
|
|
|
|
settings = get_settings()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
cfg = settings.local_redis
|
|
|
|
|
|
self._local_redis = redis.Redis(
|
|
|
|
|
|
host=cfg.host,
|
|
|
|
|
|
port=cfg.port,
|
|
|
|
|
|
db=cfg.db,
|
|
|
|
|
|
password=cfg.password,
|
|
|
|
|
|
decode_responses=cfg.decode_responses,
|
|
|
|
|
|
socket_connect_timeout=5,
|
|
|
|
|
|
socket_timeout=5,
|
2026-01-29 18:33:12 +08:00
|
|
|
|
retry_on_timeout=True,
|
|
|
|
|
|
)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
self._local_redis.ping()
|
|
|
|
|
|
logger.info(f"本地 Redis 连接成功: {cfg.host}:{cfg.port}/{cfg.db}")
|
|
|
|
|
|
|
|
|
|
|
|
# 恢复上次的 stream last_id
|
|
|
|
|
|
last_id = self._local_redis.get(LOCAL_STREAM_LAST_ID)
|
|
|
|
|
|
if last_id:
|
|
|
|
|
|
self._stream_last_id = last_id
|
|
|
|
|
|
logger.info(f"恢复 Stream 消费位置: {last_id}")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"本地 Redis 连接失败: {e}")
|
|
|
|
|
|
self._local_redis = None
|
2026-02-05 16:56:48 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
def _init_cloud_redis(self):
|
|
|
|
|
|
"""初始化云端 Redis 连接"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
cfg = settings.cloud_redis
|
|
|
|
|
|
self._cloud_redis = redis.Redis(
|
|
|
|
|
|
host=cfg.host,
|
|
|
|
|
|
port=cfg.port,
|
|
|
|
|
|
db=cfg.db,
|
|
|
|
|
|
password=cfg.password,
|
|
|
|
|
|
decode_responses=cfg.decode_responses,
|
2026-02-05 16:56:48 +08:00
|
|
|
|
socket_connect_timeout=10,
|
2026-02-10 15:21:45 +08:00
|
|
|
|
socket_timeout=10,
|
|
|
|
|
|
retry_on_timeout=True,
|
2026-02-05 16:56:48 +08:00
|
|
|
|
)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
self._cloud_redis.ping()
|
|
|
|
|
|
logger.info(f"云端 Redis 连接成功: {cfg.host}:{cfg.port}/{cfg.db}")
|
2026-02-05 16:56:48 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
except Exception as e:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.warning(f"云端 Redis 连接失败(将使用本地缓存运行): {e}")
|
|
|
|
|
|
self._cloud_redis = None
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def _init_database(self):
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"""初始化 SQLite 数据库连接"""
|
2026-01-29 18:33:12 +08:00
|
|
|
|
if self._db_manager is None:
|
2026-01-30 11:46:15 +08:00
|
|
|
|
self._db_manager = get_sqlite_manager()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
|
|
|
|
|
# ==================== 配置版本与属性 ====================
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
@property
|
|
|
|
|
|
def config_version(self) -> str:
|
|
|
|
|
|
return self._config_version
|
2026-02-05 16:56:48 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
def get_config_version(self) -> int:
|
|
|
|
|
|
"""获取当前本地配置版本号"""
|
|
|
|
|
|
if self._local_redis:
|
2026-02-05 16:56:48 +08:00
|
|
|
|
try:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
ver = self._local_redis.get(LOCAL_CONFIG_VERSION)
|
|
|
|
|
|
return int(ver) if ver else 0
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
return 0
|
2026-02-05 16:56:48 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
# ==================== 启动与停止 ====================
|
2026-02-05 16:56:48 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
def start_config_subscription(self):
|
|
|
|
|
|
"""启动配置同步
|
2026-02-05 16:56:48 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
1. 从本地 Redis 加载配置(离线优先)
|
|
|
|
|
|
2. 从云端同步最新配置(如果可用)
|
|
|
|
|
|
3. 启动 Stream 监听线程
|
|
|
|
|
|
"""
|
2026-02-11 09:57:02 +08:00
|
|
|
|
if self._sync_mode != "REDIS":
|
|
|
|
|
|
self._log_local_config_snapshot("LOCAL")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
# Step 1: 从本地 Redis 加载已有配置到 SQLite
|
|
|
|
|
|
self._load_from_local_redis()
|
2026-02-05 16:56:48 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
# Step 2: 尝试从云端拉取最新配置
|
|
|
|
|
|
if self._cloud_redis:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._sync_from_cloud()
|
|
|
|
|
|
logger.info("启动时云端配置同步完成")
|
2026-02-05 16:56:48 +08:00
|
|
|
|
except Exception as e:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.warning(f"启动时云端同步失败(使用本地缓存): {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# Step 3: 启动 Stream 监听线程
|
|
|
|
|
|
if self._stream_thread is None or not self._stream_thread.is_alive():
|
|
|
|
|
|
self._stop_event.clear()
|
|
|
|
|
|
self._stream_thread = threading.Thread(
|
|
|
|
|
|
target=self._listen_config_stream,
|
|
|
|
|
|
name="ConfigStreamListener",
|
|
|
|
|
|
daemon=True
|
|
|
|
|
|
)
|
|
|
|
|
|
self._stream_thread.start()
|
|
|
|
|
|
logger.info("配置 Stream 监听线程已启动")
|
|
|
|
|
|
|
|
|
|
|
|
def stop_config_subscription(self):
|
|
|
|
|
|
"""停止配置同步"""
|
|
|
|
|
|
self._stop_event.set()
|
|
|
|
|
|
if self._stream_thread and self._stream_thread.is_alive():
|
|
|
|
|
|
self._stream_thread.join(timeout=5)
|
|
|
|
|
|
logger.info("配置 Stream 监听线程已停止")
|
|
|
|
|
|
|
2026-02-11 09:57:02 +08:00
|
|
|
|
def _log_local_config_snapshot(self, source: str = "SQLite"):
|
|
|
|
|
|
self._init_database()
|
|
|
|
|
|
if not self._db_manager:
|
|
|
|
|
|
logger.warning(f"[EDGE] Local config snapshot skipped (no SQLite). source={source}")
|
|
|
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
|
|
|
cameras = self._db_manager.get_all_camera_configs()
|
|
|
|
|
|
rois = self._db_manager.get_all_roi_configs()
|
|
|
|
|
|
binds = []
|
|
|
|
|
|
for roi in rois:
|
|
|
|
|
|
binds.extend(self._db_manager.get_bindings_by_roi(roi["roi_id"]))
|
|
|
|
|
|
logger.info(f"[EDGE] Loading config from local db ({source})...")
|
|
|
|
|
|
logger.info(f"[EDGE] Camera count = {len(cameras)}")
|
|
|
|
|
|
logger.info(f"[EDGE] ROI count = {len(rois)}")
|
|
|
|
|
|
logger.info(f"[EDGE] Algorithm bindings = {len(binds)}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.warning(f"[EDGE] Local config snapshot failed: {e}")
|
|
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
# ==================== Stream 监听 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def _listen_config_stream(self):
|
|
|
|
|
|
"""监听云端 Redis Stream(带自动重连和指数退避)"""
|
|
|
|
|
|
backoff = 5 # 初始退避秒数
|
|
|
|
|
|
max_backoff = 60
|
|
|
|
|
|
|
|
|
|
|
|
while not self._stop_event.is_set():
|
|
|
|
|
|
try:
|
|
|
|
|
|
if not self._cloud_redis:
|
|
|
|
|
|
self._init_cloud_redis()
|
|
|
|
|
|
if not self._cloud_redis:
|
|
|
|
|
|
logger.warning(f"云端 Redis 不可用,{backoff}s 后重试...")
|
|
|
|
|
|
self._stop_event.wait(backoff)
|
|
|
|
|
|
backoff = min(backoff * 2, max_backoff)
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 连接成功,重置退避
|
|
|
|
|
|
backoff = 5
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"开始监听 Stream: {CLOUD_CONFIG_STREAM}, last_id={self._stream_last_id}")
|
|
|
|
|
|
|
|
|
|
|
|
while not self._stop_event.is_set():
|
|
|
|
|
|
# XREAD BLOCK 5000ms(每 5 秒检查一次 stop_event)
|
|
|
|
|
|
result = self._cloud_redis.xread(
|
|
|
|
|
|
{CLOUD_CONFIG_STREAM: self._stream_last_id},
|
|
|
|
|
|
count=10,
|
|
|
|
|
|
block=5000,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if not result:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
for stream_name, messages in result:
|
|
|
|
|
|
for msg_id, msg_data in messages:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._handle_stream_event(msg_data)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"处理 Stream 事件失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# 更新消费位置
|
|
|
|
|
|
self._stream_last_id = msg_id
|
|
|
|
|
|
if self._local_redis:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._local_redis.set(LOCAL_STREAM_LAST_ID, msg_id)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
except redis.ConnectionError as e:
|
2026-01-29 18:33:12 +08:00
|
|
|
|
if self._stop_event.is_set():
|
2026-02-05 16:56:48 +08:00
|
|
|
|
return
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.warning(f"云端 Redis 连接断开: {e}, {backoff}s 后重连...")
|
|
|
|
|
|
self._cloud_redis = None
|
|
|
|
|
|
self._stop_event.wait(backoff)
|
|
|
|
|
|
backoff = min(backoff * 2, max_backoff)
|
2026-02-05 09:58:04 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
if self._stop_event.is_set():
|
|
|
|
|
|
return
|
|
|
|
|
|
logger.error(f"Stream 监听异常: {e}, {backoff}s 后重试...")
|
|
|
|
|
|
self._stop_event.wait(backoff)
|
|
|
|
|
|
backoff = min(backoff * 2, max_backoff)
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_stream_event(self, msg_data: dict):
|
|
|
|
|
|
"""处理 Stream 配置变更事件"""
|
|
|
|
|
|
device_id = msg_data.get("device_id", "")
|
|
|
|
|
|
version = int(msg_data.get("version", 0))
|
|
|
|
|
|
action = msg_data.get("action", "UPDATE")
|
|
|
|
|
|
|
|
|
|
|
|
# 只处理属于本设备的事件(或广播事件 device_id="")
|
|
|
|
|
|
if device_id and device_id != self._device_id:
|
|
|
|
|
|
return
|
2026-02-05 09:58:04 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.info(f"收到配置变更: device={device_id}, version={version}, action={action}")
|
2026-02-05 09:58:04 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
local_version = self.get_config_version()
|
|
|
|
|
|
if version <= local_version and action != "ROLLBACK":
|
|
|
|
|
|
logger.info(f"忽略旧版本事件: cloud={version} <= local={local_version}")
|
2026-02-05 09:58:04 +08:00
|
|
|
|
return
|
|
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
if action == "ROLLBACK":
|
|
|
|
|
|
self._rollback_config()
|
|
|
|
|
|
else:
|
|
|
|
|
|
self._sync_from_cloud()
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== 云端同步 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def _sync_from_cloud(self):
|
|
|
|
|
|
"""从云端 Redis 拉取最新配置并应用"""
|
|
|
|
|
|
if not self._cloud_redis:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
config_key = CLOUD_DEVICE_CONFIG_KEY.format(device_id=self._device_id)
|
|
|
|
|
|
version_key = CLOUD_DEVICE_VERSION_KEY.format(device_id=self._device_id)
|
|
|
|
|
|
|
2026-02-05 09:58:04 +08:00
|
|
|
|
try:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
config_json = self._cloud_redis.get(config_key)
|
|
|
|
|
|
cloud_version = self._cloud_redis.get(version_key)
|
|
|
|
|
|
cloud_version = int(cloud_version) if cloud_version else 0
|
|
|
|
|
|
|
|
|
|
|
|
if not config_json:
|
|
|
|
|
|
logger.info("云端无此设备配置,跳过同步")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
local_version = self.get_config_version()
|
|
|
|
|
|
if cloud_version <= local_version:
|
|
|
|
|
|
logger.info(f"本地配置已是最新: local={local_version}, cloud={cloud_version}")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
config_data = json.loads(config_json)
|
|
|
|
|
|
|
|
|
|
|
|
# 备份当前配置
|
|
|
|
|
|
self._backup_current_config()
|
|
|
|
|
|
|
|
|
|
|
|
# 应用新配置
|
|
|
|
|
|
self._apply_config(config_data, cloud_version)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"配置同步成功: version {local_version} → {cloud_version}")
|
2026-02-05 09:58:04 +08:00
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.error(f"从云端同步配置失败: {e}")
|
|
|
|
|
|
self._rollback_config()
|
2026-02-05 16:56:48 +08:00
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
def _apply_config(self, config_data: dict, version: int):
|
|
|
|
|
|
"""应用配置到本地 Redis + SQLite"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 1. 写入本地 Redis
|
|
|
|
|
|
if self._local_redis:
|
|
|
|
|
|
self._local_redis.set(LOCAL_CONFIG_CURRENT, json.dumps(config_data, ensure_ascii=False))
|
|
|
|
|
|
self._local_redis.set(LOCAL_CONFIG_VERSION, str(version))
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 同步到 SQLite(推理管线从 SQLite 读取)
|
|
|
|
|
|
self._sync_config_to_sqlite(config_data)
|
|
|
|
|
|
|
|
|
|
|
|
# 3. 清除内存缓存
|
|
|
|
|
|
self._cache.clear()
|
|
|
|
|
|
|
|
|
|
|
|
# 4. 更新版本号
|
|
|
|
|
|
self._config_version = str(version)
|
|
|
|
|
|
|
|
|
|
|
|
# 5. 通知回调
|
|
|
|
|
|
self._notify_callbacks("config_update", {
|
|
|
|
|
|
"type": "full",
|
|
|
|
|
|
"version": version,
|
|
|
|
|
|
"affected_items": ["camera", "roi", "bind"],
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
# 6. 记录版本更新
|
|
|
|
|
|
self._version_control.record_update(
|
|
|
|
|
|
version=str(version),
|
|
|
|
|
|
update_type="配置同步",
|
|
|
|
|
|
description=f"从云端同步配置 v{version}",
|
|
|
|
|
|
updated_by="云端系统",
|
|
|
|
|
|
affected_items=["camera", "roi", "bind"],
|
|
|
|
|
|
details={"source": "cloud_redis"}
|
2026-01-29 18:33:12 +08:00
|
|
|
|
)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"应用配置失败: {e}")
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
def _backup_current_config(self):
|
|
|
|
|
|
"""备份当前配置到本地 Redis"""
|
|
|
|
|
|
if not self._local_redis:
|
|
|
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
|
|
|
current = self._local_redis.get(LOCAL_CONFIG_CURRENT)
|
|
|
|
|
|
if current:
|
|
|
|
|
|
self._local_redis.set(LOCAL_CONFIG_BACKUP, current)
|
|
|
|
|
|
logger.info("当前配置已备份")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.warning(f"配置备份失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _rollback_config(self):
|
|
|
|
|
|
"""回滚到 backup 配置"""
|
|
|
|
|
|
if not self._local_redis:
|
|
|
|
|
|
logger.warning("本地 Redis 不可用,无法回滚")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
backup = self._local_redis.get(LOCAL_CONFIG_BACKUP)
|
|
|
|
|
|
if not backup:
|
|
|
|
|
|
logger.warning("无备份配置,无法回滚")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
config_data = json.loads(backup)
|
|
|
|
|
|
self._local_redis.set(LOCAL_CONFIG_CURRENT, backup)
|
|
|
|
|
|
self._sync_config_to_sqlite(config_data)
|
|
|
|
|
|
self._cache.clear()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("配置已回滚到上一个版本")
|
|
|
|
|
|
|
|
|
|
|
|
self._notify_callbacks("config_update", {
|
|
|
|
|
|
"type": "rollback",
|
|
|
|
|
|
"affected_items": ["camera", "roi", "bind"],
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"配置回滚失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _load_from_local_redis(self):
|
|
|
|
|
|
"""启动时从本地 Redis 加载配置到 SQLite"""
|
|
|
|
|
|
if not self._local_redis:
|
|
|
|
|
|
logger.info("本地 Redis 不可用,使用 SQLite 现有配置")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
config_json = self._local_redis.get(LOCAL_CONFIG_CURRENT)
|
|
|
|
|
|
if config_json:
|
|
|
|
|
|
config_data = json.loads(config_json)
|
|
|
|
|
|
self._sync_config_to_sqlite(config_data)
|
|
|
|
|
|
version = self._local_redis.get(LOCAL_CONFIG_VERSION)
|
|
|
|
|
|
if version:
|
|
|
|
|
|
self._config_version = version
|
|
|
|
|
|
logger.info(f"从本地 Redis 加载配置成功, version={version}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.info("本地 Redis 无缓存配置,使用 SQLite 现有数据")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.warning(f"从本地 Redis 加载配置失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== SQLite 同步 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def _sync_config_to_sqlite(self, config_data: dict):
|
|
|
|
|
|
"""将配置数据同步到 SQLite"""
|
|
|
|
|
|
self._init_database()
|
|
|
|
|
|
if not self._db_manager:
|
|
|
|
|
|
logger.warning("SQLite 不可用,跳过同步")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
count = 0
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 同步摄像头配置
|
|
|
|
|
|
cameras = config_data.get("cameras", [])
|
|
|
|
|
|
for cam in cameras:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._db_manager.save_camera_config(
|
|
|
|
|
|
camera_id=cam.get("camera_id", ""),
|
|
|
|
|
|
rtsp_url=cam.get("rtsp_url", ""),
|
|
|
|
|
|
camera_name=cam.get("camera_name", ""),
|
|
|
|
|
|
enabled=cam.get("enabled", True),
|
|
|
|
|
|
location=cam.get("location", ""),
|
|
|
|
|
|
)
|
|
|
|
|
|
count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"同步摄像头配置失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# 同步 ROI 配置
|
|
|
|
|
|
rois = config_data.get("rois", [])
|
|
|
|
|
|
for roi in rois:
|
2026-02-11 09:57:02 +08:00
|
|
|
|
if not isinstance(roi, dict):
|
|
|
|
|
|
logger.error(f"?? ROI ????: invalid roi item type={type(roi)}")
|
|
|
|
|
|
continue
|
2026-02-10 15:21:45 +08:00
|
|
|
|
try:
|
|
|
|
|
|
coordinates = roi.get("coordinates", [])
|
2026-02-11 09:57:02 +08:00
|
|
|
|
# ?? rectangle dict ? polygon list-of-dict ??
|
|
|
|
|
|
if isinstance(coordinates, dict):
|
|
|
|
|
|
coordinates = {
|
|
|
|
|
|
"x": coordinates.get("x", 0),
|
|
|
|
|
|
"y": coordinates.get("y", 0),
|
|
|
|
|
|
"w": coordinates.get("w", 0),
|
|
|
|
|
|
"h": coordinates.get("h", 0),
|
|
|
|
|
|
}
|
|
|
|
|
|
elif coordinates and isinstance(coordinates, list) and isinstance(coordinates[0], dict):
|
2026-02-10 15:21:45 +08:00
|
|
|
|
coordinates = [[p.get("x", 0), p.get("y", 0)] for p in coordinates]
|
|
|
|
|
|
|
|
|
|
|
|
self._db_manager.save_roi_config(
|
|
|
|
|
|
roi_id=roi.get("roi_id", ""),
|
|
|
|
|
|
camera_id=roi.get("camera_id", ""),
|
|
|
|
|
|
roi_type=roi.get("roi_type", "polygon"),
|
|
|
|
|
|
coordinates=coordinates,
|
|
|
|
|
|
enabled=roi.get("enabled", True),
|
|
|
|
|
|
priority=roi.get("priority", 0),
|
|
|
|
|
|
)
|
|
|
|
|
|
count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"同步 ROI 配置失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# 同步算法绑定
|
|
|
|
|
|
binds = config_data.get("binds", [])
|
|
|
|
|
|
for bind in binds:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._db_manager.save_roi_algo_bind(
|
|
|
|
|
|
bind_id=bind.get("bind_id", ""),
|
|
|
|
|
|
roi_id=bind.get("roi_id", ""),
|
|
|
|
|
|
algo_code=bind.get("algo_code", ""),
|
|
|
|
|
|
params=bind.get("params", {}),
|
|
|
|
|
|
priority=bind.get("priority", 0),
|
|
|
|
|
|
enabled=bind.get("enabled", True),
|
|
|
|
|
|
)
|
|
|
|
|
|
count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"同步算法绑定失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"配置同步到 SQLite 完成: {count} 条记录")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"配置同步到 SQLite 失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== 回调管理 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def register_callback(self, topic: str, callback: Callable):
|
|
|
|
|
|
"""注册配置变更回调函数"""
|
|
|
|
|
|
if topic not in self._callbacks:
|
|
|
|
|
|
self._callbacks[topic] = set()
|
|
|
|
|
|
self._callbacks[topic].add(callback)
|
|
|
|
|
|
logger.info(f"已注册配置变更回调: {topic}")
|
|
|
|
|
|
|
|
|
|
|
|
def unregister_callback(self, topic: str, callback: Callable):
|
|
|
|
|
|
if topic in self._callbacks:
|
|
|
|
|
|
self._callbacks[topic].discard(callback)
|
|
|
|
|
|
|
|
|
|
|
|
def _notify_callbacks(self, topic: str, data: Dict[str, Any]):
|
|
|
|
|
|
if topic in self._callbacks:
|
|
|
|
|
|
for callback in self._callbacks[topic]:
|
|
|
|
|
|
try:
|
|
|
|
|
|
callback(topic, data)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"配置变更回调执行失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== 配置读取(供推理管线使用) ====================
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def get_cameras(self, force_refresh: bool = False) -> List[CameraInfoModel]:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"""获取摄像头配置列表(从 SQLite 读取)"""
|
2026-01-29 18:33:12 +08:00
|
|
|
|
cache_key = "cameras"
|
|
|
|
|
|
if not force_refresh:
|
|
|
|
|
|
cached = self._cache.get(cache_key)
|
|
|
|
|
|
if cached is not None:
|
|
|
|
|
|
return cached
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
self._init_database()
|
|
|
|
|
|
if self._db_manager is None:
|
|
|
|
|
|
return []
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
try:
|
2026-01-30 11:46:15 +08:00
|
|
|
|
cameras = self._db_manager.get_all_camera_configs()
|
2026-01-29 18:33:12 +08:00
|
|
|
|
result = [CameraInfoModel.from_dict(c) for c in cameras]
|
|
|
|
|
|
self._cache.set(cache_key, result)
|
|
|
|
|
|
logger.info(f"已加载摄像头配置: {len(result)} 个")
|
|
|
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取摄像头配置失败: {e}")
|
2026-02-10 15:21:45 +08:00
|
|
|
|
return self._cache.get(cache_key) or []
|
|
|
|
|
|
|
|
|
|
|
|
def get_roi_configs(self, camera_id: Optional[str] = None,
|
2026-01-29 18:33:12 +08:00
|
|
|
|
force_refresh: bool = False) -> List[ROIInfo]:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"""获取 ROI 配置列表"""
|
2026-01-29 18:33:12 +08:00
|
|
|
|
cache_key = f"rois_{camera_id}" if camera_id else "rois_all"
|
|
|
|
|
|
if not force_refresh:
|
|
|
|
|
|
cached = self._cache.get(cache_key)
|
|
|
|
|
|
if cached is not None:
|
|
|
|
|
|
return cached
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
self._init_database()
|
|
|
|
|
|
if self._db_manager is None:
|
|
|
|
|
|
return []
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
try:
|
2026-01-30 11:46:15 +08:00
|
|
|
|
if camera_id:
|
|
|
|
|
|
roi_configs = self._db_manager.get_rois_by_camera(camera_id)
|
|
|
|
|
|
else:
|
|
|
|
|
|
roi_configs = self._db_manager.get_all_roi_configs()
|
2026-01-29 18:33:12 +08:00
|
|
|
|
result = [ROIInfo.from_dict(r) for r in roi_configs]
|
|
|
|
|
|
self._cache.set(cache_key, result)
|
|
|
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.error(f"获取 ROI 配置失败: {e}")
|
|
|
|
|
|
return self._cache.get(cache_key) or []
|
|
|
|
|
|
|
2026-02-03 14:26:52 +08:00
|
|
|
|
def get_roi_configs_with_bindings(self, camera_id: Optional[str] = None,
|
|
|
|
|
|
force_refresh: bool = False) -> List[ROIInfoNew]:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"""获取 ROI 配置列表(包含算法绑定信息)"""
|
2026-02-03 14:26:52 +08:00
|
|
|
|
cache_key = f"rois_bindings_{camera_id}" if camera_id else "rois_bindings_all"
|
|
|
|
|
|
if not force_refresh:
|
|
|
|
|
|
cached = self._cache.get(cache_key)
|
|
|
|
|
|
if cached is not None:
|
|
|
|
|
|
return cached
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-02-03 14:26:52 +08:00
|
|
|
|
self._init_database()
|
|
|
|
|
|
if self._db_manager is None:
|
|
|
|
|
|
return []
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-02-03 14:26:52 +08:00
|
|
|
|
try:
|
|
|
|
|
|
if camera_id:
|
|
|
|
|
|
roi_configs = self._db_manager.get_rois_by_camera(camera_id)
|
|
|
|
|
|
bindings_list = self._db_manager.get_bindings_by_camera(camera_id)
|
|
|
|
|
|
else:
|
|
|
|
|
|
roi_configs = self._db_manager.get_all_roi_configs()
|
|
|
|
|
|
bindings_list = []
|
|
|
|
|
|
for roi in roi_configs:
|
|
|
|
|
|
bindings = self._db_manager.get_bindings_by_roi(roi['roi_id'])
|
|
|
|
|
|
bindings_list.extend(bindings)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-02-03 14:26:52 +08:00
|
|
|
|
roi_dict = {r['roi_id']: r for r in roi_configs}
|
2026-02-10 15:21:45 +08:00
|
|
|
|
bindings_dict: Dict[str, list] = {}
|
2026-02-03 14:26:52 +08:00
|
|
|
|
for b in bindings_list:
|
|
|
|
|
|
roi_id = b['roi_id']
|
|
|
|
|
|
if roi_id not in bindings_dict:
|
|
|
|
|
|
bindings_dict[roi_id] = []
|
|
|
|
|
|
bindings_dict[roi_id].append(b)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-02-03 14:26:52 +08:00
|
|
|
|
result = []
|
|
|
|
|
|
for roi_id, roi_data in roi_dict.items():
|
|
|
|
|
|
roi_info = ROIInfoNew.from_dict(roi_data)
|
|
|
|
|
|
if roi_id in bindings_dict:
|
|
|
|
|
|
roi_info.bindings = [ROIAlgoBind.from_dict(b) for b in bindings_dict[roi_id]]
|
|
|
|
|
|
result.append(roi_info)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-02-03 14:26:52 +08:00
|
|
|
|
result.sort(key=lambda x: x.priority, reverse=True)
|
|
|
|
|
|
self._cache.set(cache_key, result)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.info(f"已加载 ROI 配置(含绑定): {len(result)} 个")
|
2026-02-03 14:26:52 +08:00
|
|
|
|
return result
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-02-03 14:26:52 +08:00
|
|
|
|
except Exception as e:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.error(f"获取 ROI 配置(含绑定)失败: {e}")
|
|
|
|
|
|
return self._cache.get(cache_key) or []
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def get_camera_rois(self, camera_id: str) -> List[ROIInfo]:
|
|
|
|
|
|
return self.get_roi_configs(camera_id=camera_id)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def get_config_by_id(self, config_type: str, config_id: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
|
self._init_database()
|
|
|
|
|
|
try:
|
|
|
|
|
|
if config_type == "camera":
|
2026-02-10 15:21:45 +08:00
|
|
|
|
return self._db_manager.get_camera_config(config_id)
|
2026-01-29 18:33:12 +08:00
|
|
|
|
elif config_type == "roi":
|
2026-02-10 15:21:45 +08:00
|
|
|
|
return self._db_manager.get_roi_config(config_id)
|
2026-01-29 18:33:12 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取配置失败: {e}")
|
|
|
|
|
|
return None
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
|
|
|
|
|
def get_current_config(self) -> Optional[dict]:
|
|
|
|
|
|
"""获取当前完整配置(从本地 Redis)"""
|
|
|
|
|
|
if not self._local_redis:
|
|
|
|
|
|
return None
|
2026-01-29 18:33:12 +08:00
|
|
|
|
try:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
config_json = self._local_redis.get(LOCAL_CONFIG_CURRENT)
|
|
|
|
|
|
return json.loads(config_json) if config_json else None
|
2026-01-29 18:33:12 +08:00
|
|
|
|
except Exception as e:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
logger.error(f"获取当前配置失败: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
2026-02-11 09:57:02 +08:00
|
|
|
|
def _get_current_config_from_local_redis(self) -> Optional[dict]:
|
|
|
|
|
|
if not self._local_redis:
|
|
|
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
|
|
|
config_json = self._local_redis.get(LOCAL_CONFIG_CURRENT)
|
|
|
|
|
|
return json.loads(config_json) if config_json else None
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_bindings_from_redis(self, roi_id: str) -> List[Dict[str, Any]]:
|
|
|
|
|
|
"""获取 ROI 绑定(LOCAL 模式从 SQLite 读取)"""
|
|
|
|
|
|
if self._sync_mode == "REDIS":
|
|
|
|
|
|
config = self._get_current_config_from_local_redis()
|
|
|
|
|
|
if config:
|
|
|
|
|
|
binds = config.get("binds", [])
|
|
|
|
|
|
if roi_id:
|
|
|
|
|
|
binds = [b for b in binds if b.get("roi_id") == roi_id]
|
|
|
|
|
|
return binds
|
|
|
|
|
|
|
|
|
|
|
|
self._init_database()
|
|
|
|
|
|
if not self._db_manager:
|
|
|
|
|
|
return []
|
|
|
|
|
|
if roi_id:
|
|
|
|
|
|
return self._db_manager.get_bindings_by_roi(roi_id)
|
|
|
|
|
|
|
|
|
|
|
|
binds: List[Dict[str, Any]] = []
|
|
|
|
|
|
rois = self._db_manager.get_all_roi_configs()
|
|
|
|
|
|
for roi in rois:
|
|
|
|
|
|
binds.extend(self._db_manager.get_bindings_by_roi(roi["roi_id"]))
|
|
|
|
|
|
return binds
|
|
|
|
|
|
|
|
|
|
|
|
def get_algo_bind_from_redis(self, bind_id: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
|
"""获取单个 bind(LOCAL 模式从 SQLite 读取)"""
|
|
|
|
|
|
if self._sync_mode == "REDIS":
|
|
|
|
|
|
config = self._get_current_config_from_local_redis()
|
|
|
|
|
|
if config:
|
|
|
|
|
|
for bind in config.get("binds", []):
|
|
|
|
|
|
if bind.get("bind_id") == bind_id:
|
|
|
|
|
|
return bind
|
|
|
|
|
|
|
|
|
|
|
|
self._init_database()
|
|
|
|
|
|
if not self._db_manager:
|
|
|
|
|
|
return None
|
|
|
|
|
|
return self._db_manager.get_roi_algo_bind(bind_id)
|
|
|
|
|
|
|
|
|
|
|
|
def reload_local_config_from_file(self) -> bool:
|
|
|
|
|
|
"""本地调试:从 JSON 文件读取配置并同步到 SQLite"""
|
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
config_path = settings.debug.local_config_path
|
|
|
|
|
|
try:
|
|
|
|
|
|
if not os.path.exists(config_path):
|
|
|
|
|
|
logger.warning(f"本地配置文件不存在: {config_path}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
|
|
|
|
config_data = json.load(f)
|
|
|
|
|
|
return self.reload_local_config(config_data, source="FILE")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"本地配置文件加载失败: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def _clear_rois_for_camera_ids(self, camera_ids: List[str]):
|
|
|
|
|
|
if not camera_ids:
|
|
|
|
|
|
return
|
|
|
|
|
|
self._init_database()
|
|
|
|
|
|
if not self._db_manager:
|
|
|
|
|
|
return
|
|
|
|
|
|
for camera_id in camera_ids:
|
|
|
|
|
|
rois = self._db_manager.get_rois_by_camera(camera_id)
|
|
|
|
|
|
for roi in rois:
|
|
|
|
|
|
roi_id = roi.get("roi_id")
|
|
|
|
|
|
if roi_id:
|
|
|
|
|
|
self._db_manager.delete_bindings_by_roi(roi_id)
|
|
|
|
|
|
self._db_manager.delete_roi_config(roi_id)
|
|
|
|
|
|
|
|
|
|
|
|
def reload_local_config(self, config_data: dict, source: str = "LOCAL") -> bool:
|
|
|
|
|
|
# ?????????????? camelCase key
|
|
|
|
|
|
# ???? data ??
|
|
|
|
|
|
if isinstance(config_data, dict) and isinstance(config_data.get("data"), dict):
|
|
|
|
|
|
config_data = config_data.get("data")
|
|
|
|
|
|
if not config_data:
|
|
|
|
|
|
logger.warning(f"[EDGE] Empty config payload, source={source}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
|
|
|
rois = (config_data.get("rois") or config_data.get("roiConfigs") or config_data.get("roi_list") or [])
|
|
|
|
|
|
binds = (config_data.get("binds") or config_data.get("roiAlgoBinds") or config_data.get("algoBinds") or config_data.get("bindings") or [])
|
|
|
|
|
|
cams = (config_data.get("cameras") or config_data.get("cameraList") or config_data.get("camera_list") or [])
|
|
|
|
|
|
logger.info("[EDGE] Incoming payload: cameras=%s rois=%s binds=%s source=%s",
|
|
|
|
|
|
len(cams) if isinstance(cams, list) else 0,
|
|
|
|
|
|
len(rois) if isinstance(rois, list) else 0,
|
|
|
|
|
|
len(binds) if isinstance(binds, list) else 0,
|
|
|
|
|
|
source)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
rois = config_data.get("rois") or config_data.get("roiConfigs") or config_data.get("roi_list")
|
|
|
|
|
|
if isinstance(rois, list):
|
|
|
|
|
|
norm_rois = []
|
|
|
|
|
|
for r in rois:
|
|
|
|
|
|
if not isinstance(r, dict):
|
|
|
|
|
|
norm_rois.append(r)
|
|
|
|
|
|
continue
|
|
|
|
|
|
if "roi_id" not in r and "roiId" in r:
|
|
|
|
|
|
r["roi_id"] = r.get("roiId")
|
|
|
|
|
|
if "camera_id" not in r and "cameraId" in r:
|
|
|
|
|
|
r["camera_id"] = r.get("cameraId")
|
|
|
|
|
|
if "roi_type" not in r and "roiType" in r:
|
|
|
|
|
|
r["roi_type"] = r.get("roiType")
|
|
|
|
|
|
norm_rois.append(r)
|
|
|
|
|
|
config_data["rois"] = norm_rois
|
|
|
|
|
|
|
|
|
|
|
|
binds = config_data.get("binds") or config_data.get("roiAlgoBinds") or config_data.get("algoBinds") or config_data.get("bindings")
|
|
|
|
|
|
if isinstance(binds, list):
|
|
|
|
|
|
norm_binds = []
|
|
|
|
|
|
for b in binds:
|
|
|
|
|
|
if not isinstance(b, dict):
|
|
|
|
|
|
norm_binds.append(b)
|
|
|
|
|
|
continue
|
|
|
|
|
|
if "bind_id" not in b and "bindId" in b:
|
|
|
|
|
|
b["bind_id"] = b.get("bindId")
|
|
|
|
|
|
if "roi_id" not in b and "roiId" in b:
|
|
|
|
|
|
b["roi_id"] = b.get("roiId")
|
|
|
|
|
|
if "algo_code" not in b and "algoCode" in b:
|
|
|
|
|
|
b["algo_code"] = b.get("algoCode")
|
|
|
|
|
|
norm_binds.append(b)
|
|
|
|
|
|
config_data["binds"] = norm_binds
|
|
|
|
|
|
# 本地调试:从内存配置同步到 SQLite(支持覆盖式更新)
|
|
|
|
|
|
try:
|
|
|
|
|
|
camera_ids: List[str] = []
|
|
|
|
|
|
for cam in config_data.get("cameras", []) or []:
|
|
|
|
|
|
cid = cam.get("camera_id")
|
|
|
|
|
|
if cid:
|
|
|
|
|
|
camera_ids.append(cid)
|
|
|
|
|
|
for cid in config_data.get("camera_ids", []) or []:
|
|
|
|
|
|
if cid:
|
|
|
|
|
|
camera_ids.append(cid)
|
|
|
|
|
|
for roi in config_data.get("rois", []) or []:
|
|
|
|
|
|
cid = roi.get("camera_id")
|
|
|
|
|
|
if cid:
|
|
|
|
|
|
camera_ids.append(cid)
|
|
|
|
|
|
|
|
|
|
|
|
incoming_ids = set(camera_ids)
|
|
|
|
|
|
|
|
|
|
|
|
if camera_ids:
|
|
|
|
|
|
self._clear_rois_for_camera_ids(list(incoming_ids))
|
|
|
|
|
|
|
|
|
|
|
|
# 仅全量推送时,清除不在本次推送中的旧摄像头
|
|
|
|
|
|
# sync_mode="full" 由 push-all 设置;单摄像头推送不带此标志,不清理
|
|
|
|
|
|
sync_mode = config_data.get("sync_mode", "partial")
|
|
|
|
|
|
if sync_mode == "full":
|
|
|
|
|
|
self._init_database()
|
2026-02-14 11:33:43 +08:00
|
|
|
|
if self._db_manager and incoming_ids:
|
2026-02-11 09:57:02 +08:00
|
|
|
|
try:
|
|
|
|
|
|
existing = self._db_manager.get_all_camera_configs()
|
|
|
|
|
|
for cam in existing:
|
|
|
|
|
|
old_id = cam.get("camera_id")
|
|
|
|
|
|
if old_id and old_id not in incoming_ids:
|
|
|
|
|
|
self._clear_rois_for_camera_ids([old_id])
|
|
|
|
|
|
self._db_manager.delete_camera_config(old_id)
|
2026-02-14 11:33:43 +08:00
|
|
|
|
logger.info(f"[EDGE] 清除不在推送列表中的旧摄像头: {old_id}")
|
2026-02-11 09:57:02 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.warning(f"[EDGE] 清理旧摄像头失败: {e}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.info(f"[EDGE] 增量推送 (sync_mode={sync_mode}),跳过旧摄像头清理")
|
|
|
|
|
|
|
|
|
|
|
|
version = int(time.time())
|
|
|
|
|
|
self._apply_config(config_data, version)
|
|
|
|
|
|
self.invalidate_all_cache()
|
|
|
|
|
|
self._log_local_config_snapshot(source)
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"本地配置同步失败: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
2026-02-10 15:21:45 +08:00
|
|
|
|
# ==================== 缓存管理 ====================
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def invalidate_cache(self, cache_key: str):
|
|
|
|
|
|
self._cache.delete(cache_key)
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def invalidate_all_cache(self):
|
|
|
|
|
|
self._cache.clear()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def get_cache_stats(self) -> Dict[str, Any]:
|
|
|
|
|
|
return self._cache.get_stats()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
|
|
|
|
|
# ==================== 健康状态 ====================
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def get_health_status(self) -> Dict[str, Any]:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
cloud_healthy = False
|
|
|
|
|
|
if self._cloud_redis:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._cloud_redis.ping()
|
|
|
|
|
|
cloud_healthy = True
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
local_healthy = False
|
|
|
|
|
|
if self._local_redis:
|
2026-01-29 18:33:12 +08:00
|
|
|
|
try:
|
2026-02-10 15:21:45 +08:00
|
|
|
|
self._local_redis.ping()
|
|
|
|
|
|
local_healthy = True
|
2026-01-29 18:33:12 +08:00
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
return {
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"cloud_redis_connected": cloud_healthy,
|
|
|
|
|
|
"local_redis_connected": local_healthy,
|
2026-01-29 18:33:12 +08:00
|
|
|
|
"config_version": self._config_version,
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"local_version": self.get_config_version(),
|
2026-01-29 18:33:12 +08:00
|
|
|
|
"cache_stats": self.get_cache_stats(),
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"stream_listener_active": (
|
|
|
|
|
|
self._stream_thread is not None and
|
|
|
|
|
|
self._stream_thread.is_alive()
|
2026-01-29 18:33:12 +08:00
|
|
|
|
),
|
|
|
|
|
|
}
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
|
|
|
|
|
# ==================== 关闭 ====================
|
|
|
|
|
|
|
2026-01-29 18:33:12 +08:00
|
|
|
|
def close(self):
|
2026-02-10 15:21:45 +08:00
|
|
|
|
"""关闭管理器,释放所有连接"""
|
2026-01-29 18:33:12 +08:00
|
|
|
|
self.stop_config_subscription()
|
2026-02-10 15:21:45 +08:00
|
|
|
|
|
|
|
|
|
|
if self._cloud_redis:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._cloud_redis.close()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
logger.info("云端 Redis 连接已关闭")
|
|
|
|
|
|
|
|
|
|
|
|
if self._local_redis:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._local_redis.close()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
logger.info("本地 Redis 连接已关闭")
|
2026-01-29 18:33:12 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_config_sync_manager() -> ConfigSyncManager:
|
|
|
|
|
|
"""获取配置同步管理器单例"""
|
2026-02-11 09:57:02 +08:00
|
|
|
|
return ConfigSyncManager()
|