From 101b26fc95e4f31c6d701821cd1f1c2f409ecbc3 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 30 Jan 2026 13:51:58 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E7=83=AD=E6=9B=B4=E6=96=B0=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 数据库扩展: - roi_configs 新增算法参数字段(working_hours, confirm_on_duty_sec等) - 新增 config_update_log 表记录配置变更日志 Redis缓存: - ROI/摄像头配置缓存到 Redis(TTL 1小时) - sync_all_to_redis() 批量同步配置 - notify_config_change() 发布配置变更通知 热更新: - AlgorithmManager 订阅 Redis config_update 频道 - load_from_redis() 从 Redis 加载算法参数 - reload_algorithm() 热更新单个算法 - reload_all_algorithms() 重新加载所有算法 配置模型: - ROIInfo 添加算法参数字段 --- algorithms.py | 138 +++++++++++++++++++++++++++++++++- config/config_models.py | 23 ++++++ config/database.py | 107 ++++++++++++++++++++++++-- core/config_sync.py | 162 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 424 insertions(+), 6 deletions(-) diff --git a/algorithms.py b/algorithms.py index b69ef32..b0ed343 100644 --- a/algorithms.py +++ b/algorithms.py @@ -264,6 +264,7 @@ class AlgorithmManager: def __init__(self, working_hours: Optional[List[Dict]] = None): self.algorithms: Dict[str, Dict[str, Any]] = {} self.working_hours = working_hours or [] + self._update_lock = threading.Lock() self.default_params = { "leave_post": { @@ -278,7 +279,142 @@ class AlgorithmManager: "target_class": None, }, } - + + self._pubsub = None + self._pubsub_thread = None + self._running = False + + def start_config_subscription(self): + """启动配置变更订阅""" + try: + from config.settings import get_settings + settings = get_settings() + redis_client = redis.Redis( + host=settings.redis.host, + port=settings.redis.port, + db=settings.redis.db, + password=settings.redis.password, + decode_responses=True, + ) + + self._pubsub = redis_client.pubsub() + self._pubsub.subscribe("config_update") + + self._running = True + self._pubsub_thread = threading.Thread( + target=self._config_update_worker, + name="ConfigUpdateSub", + daemon=True + ) + self._pubsub_thread.start() + logger.info("已启动配置变更订阅") + except Exception as e: + logger.error(f"启动配置订阅失败: {e}") + + def _config_update_worker(self): + """配置更新订阅工作线程""" + try: + for message in self._pubsub.listen(): + if not self._running: + break + if message["type"] == "message": + try: + import json + data = json.loads(message["data"]) + if data.get("type") == "roi": + roi_ids = data.get("ids", []) + if roi_ids: + for roi_id in roi_ids: + self.reload_algorithm(roi_id) + else: + self.reload_all_algorithms() + except Exception as e: + logger.error(f"处理配置更新消息失败: {e}") + except Exception as e: + logger.error(f"配置订阅线程异常: {e}") + + def stop_config_subscription(self): + """停止配置变更订阅""" + self._running = False + if self._pubsub: + self._pubsub.close() + if self._pubsub_thread and self._pubsub_thread.is_alive(): + self._pubsub_thread.join(timeout=5) + logger.info("配置订阅已停止") + + def load_from_redis(self, roi_id: str) -> bool: + """从Redis加载单个ROI的算法配置""" + try: + from core.config_sync import get_config_sync_manager + config_manager = get_config_sync_manager() + roi_config = config_manager.get_roi_from_redis(roi_id) + + if not roi_config: + return False + + with self._update_lock: + algorithm_type = roi_config.get("algorithm_type", "leave_post") + + if algorithm_type == "leave_post": + params = { + "working_hours": roi_config.get("working_hours"), + "confirm_on_duty_sec": roi_config.get("confirm_on_duty_sec", 10), + "confirm_leave_sec": roi_config.get("confirm_leave_sec", 10), + "cooldown_sec": roi_config.get("cooldown_sec", 300), + "target_class": roi_config.get("target_class", "person"), + } + if roi_id in self.algorithms and "leave_post" in self.algorithms[roi_id]: + algo = self.algorithms[roi_id]["leave_post"] + algo.confirm_on_duty_sec = params["confirm_on_duty_sec"] + algo.confirm_leave_sec = params["confirm_leave_sec"] + algo.cooldown_sec = params["cooldown_sec"] + algo.target_class = params["target_class"] + if params["working_hours"]: + algo.working_hours = params["working_hours"] + logger.info(f"已热更新算法参数: {roi_id}") + else: + self.register_algorithm(roi_id, "leave_post", params) + logger.info(f"已从Redis加载算法: {roi_id}") + else: + params = {} + if roi_id in self.algorithms and algorithm_type in self.algorithms[roi_id]: + pass + else: + self.register_algorithm(roi_id, algorithm_type, params) + logger.info(f"已从Redis加载算法: {roi_id}") + + return True + except Exception as e: + logger.error(f"从Redis加载算法配置失败: {e}") + return False + + def reload_algorithm(self, roi_id: str) -> bool: + """重新加载单个ROI的算法配置""" + if roi_id not in self.algorithms: + return self.load_from_redis(roi_id) + + self.reset_algorithm(roi_id) + return self.load_from_redis(roi_id) + + def reload_all_algorithms(self) -> int: + """重新加载所有算法配置""" + count = 0 + try: + from core.config_sync import get_config_sync_manager + config_manager = get_config_sync_manager() + roi_configs = config_manager.get_all_roi_configs() + + for roi_config in roi_configs: + roi_id = roi_config.get("roi_id") + if self.reload_algorithm(roi_id): + count += 1 + + logger.info(f"已重新加载 {count} 个算法配置") + return count + except Exception as e: + logger.error(f"重新加载所有算法配置失败: {e}") + return count + def register_algorithm( self, roi_id: str, diff --git a/config/config_models.py b/config/config_models.py index d11bc0a..1340258 100644 --- a/config/config_models.py +++ b/config/config_models.py @@ -104,6 +104,11 @@ class ROIInfo: alert_cooldown: int = 300 enabled: bool = True extra_params: Optional[Dict[str, Any]] = None + working_hours: Optional[List[Dict]] = None # 工作时间段 + confirm_on_duty_sec: int = 10 # 在岗确认时间 + confirm_leave_sec: int = 10 # 离岗确认时间 + cooldown_sec: int = 300 # 告警冷却时间 + target_class: str = "person" # 目标类别 def to_dict(self) -> Dict[str, Any]: """转换为字典""" @@ -117,6 +122,11 @@ class ROIInfo: "alert_cooldown": self.alert_cooldown, "enabled": self.enabled, "extra_params": self.extra_params, + "working_hours": self.working_hours, + "confirm_on_duty_sec": self.confirm_on_duty_sec, + "confirm_leave_sec": self.confirm_leave_sec, + "cooldown_sec": self.cooldown_sec, + "target_class": self.target_class, } @classmethod @@ -128,6 +138,14 @@ class ROIInfo: algo_type_str = data.get("algorithm_type", "leave_post") algo_type = AlgorithmType(algo_type_str) if algo_type_str in [e.value for e in AlgorithmType] else AlgorithmType.LEAVE_POST + working_hours = data.get("working_hours") + if isinstance(working_hours, str): + import json + try: + working_hours = json.loads(working_hours) + except: + working_hours = None + return cls( roi_id=data.get("roi_id", ""), camera_id=data.get("camera_id", ""), @@ -138,6 +156,11 @@ class ROIInfo: alert_cooldown=data.get("alert_cooldown", 300), enabled=data.get("enabled", True), extra_params=data.get("extra_params"), + working_hours=working_hours, + confirm_on_duty_sec=data.get("confirm_on_duty_sec", 10), + confirm_leave_sec=data.get("confirm_leave_sec", 10), + cooldown_sec=data.get("cooldown_sec", 300), + target_class=data.get("target_class", "person"), ) def is_point_inside(self, point: List[float]) -> bool: diff --git a/config/database.py b/config/database.py index 38937b6..f8ace3a 100644 --- a/config/database.py +++ b/config/database.py @@ -162,6 +162,23 @@ class SQLiteManager: alert_cooldown INTEGER DEFAULT 300, enabled BOOLEAN DEFAULT 1, extra_params TEXT, + working_hours TEXT, + confirm_on_duty_sec INTEGER DEFAULT 10, + confirm_leave_sec INTEGER DEFAULT 10, + cooldown_sec INTEGER DEFAULT 300, + target_class TEXT DEFAULT 'person', + updated_at TEXT + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS config_update_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + config_type TEXT NOT NULL, + config_id TEXT, + old_value TEXT, + new_value TEXT, + updated_by TEXT, updated_at TEXT ) """) @@ -469,14 +486,21 @@ class SQLiteManager: cursor.execute(""" INSERT OR REPLACE INTO roi_configs ( roi_id, camera_id, roi_type, coordinates, algorithm_type, - alert_threshold, alert_cooldown, enabled, extra_params, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + alert_threshold, alert_cooldown, enabled, extra_params, + working_hours, confirm_on_duty_sec, confirm_leave_sec, + cooldown_sec, target_class, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( roi_id, camera_id, roi_type, str(coordinates), algorithm_type, kwargs.get('alert_threshold', 3), kwargs.get('alert_cooldown', 300), kwargs.get('enabled', True), str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None, + str(kwargs.get('working_hours')) if kwargs.get('working_hours') else None, + kwargs.get('confirm_on_duty_sec', 10), + kwargs.get('confirm_leave_sec', 10), + kwargs.get('cooldown_sec', 300), + kwargs.get('target_class', 'person'), now )) self._conn.commit() @@ -494,12 +518,19 @@ class SQLiteManager: if row: columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates', 'algorithm_type', 'alert_threshold', 'alert_cooldown', - 'enabled', 'extra_params', 'updated_at'] + 'enabled', 'extra_params', 'working_hours', + 'confirm_on_duty_sec', 'confirm_leave_sec', 'cooldown_sec', + 'target_class', 'updated_at'] result = dict(zip(columns, row)) try: result['coordinates'] = eval(result['coordinates']) except: pass + try: + if result.get('working_hours'): + result['working_hours'] = eval(result['working_hours']) + except: + pass return result return None except Exception as e: @@ -513,7 +544,9 @@ class SQLiteManager: cursor.execute("SELECT * FROM roi_configs WHERE camera_id = ?", (camera_id,)) columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates', 'algorithm_type', 'alert_threshold', 'alert_cooldown', - 'enabled', 'extra_params', 'updated_at'] + 'enabled', 'extra_params', 'working_hours', + 'confirm_on_duty_sec', 'confirm_leave_sec', 'cooldown_sec', + 'target_class', 'updated_at'] results = [] for row in cursor.fetchall(): r = dict(zip(columns, row)) @@ -521,6 +554,11 @@ class SQLiteManager: r['coordinates'] = eval(r['coordinates']) except: pass + try: + if r.get('working_hours'): + r['working_hours'] = eval(r['working_hours']) + except: + pass results.append(r) return results except Exception as e: @@ -534,7 +572,9 @@ class SQLiteManager: cursor.execute("SELECT * FROM roi_configs ORDER BY camera_id, roi_id") columns = ['roi_id', 'camera_id', 'roi_type', 'coordinates', 'algorithm_type', 'alert_threshold', 'alert_cooldown', - 'enabled', 'extra_params', 'updated_at'] + 'enabled', 'extra_params', 'working_hours', + 'confirm_on_duty_sec', 'confirm_leave_sec', 'cooldown_sec', + 'target_class', 'updated_at'] results = [] for row in cursor.fetchall(): r = dict(zip(columns, row)) @@ -542,6 +582,11 @@ class SQLiteManager: r['coordinates'] = eval(r['coordinates']) except: pass + try: + if r.get('working_hours'): + r['working_hours'] = eval(r['working_hours']) + except: + pass results.append(r) return results except Exception as e: @@ -558,6 +603,58 @@ class SQLiteManager: except Exception as e: logger.error(f"删除ROI配置失败: {e}") return False + + def log_config_update( + self, + config_type: str, + config_id: Optional[str], + old_value: Any, + new_value: Any, + updated_by: str = "system" + ): + """记录配置更新日志""" + try: + cursor = self._conn.cursor() + now = datetime.now().isoformat() + cursor.execute(""" + INSERT INTO config_update_log ( + config_type, config_id, old_value, new_value, updated_by, updated_at + ) VALUES (?, ?, ?, ?, ?, ?) + """, ( + config_type, + config_id, + str(old_value) if old_value else None, + str(new_value) if new_value else None, + updated_by, + now + )) + self._conn.commit() + logger.info(f"配置更新日志已记录: {config_type}/{config_id}") + except Exception as e: + logger.error(f"记录配置更新日志失败: {e}") + + def get_config_update_log( + self, + config_type: Optional[str] = None, + limit: int = 100 + ) -> List[Dict[str, Any]]: + """获取配置更新日志""" + try: + cursor = self._conn.cursor() + query = "SELECT * FROM config_update_log WHERE 1=1" + params = [] + if config_type: + query += " AND config_type = ?" + params.append(config_type) + query += " ORDER BY id DESC LIMIT ?" + params.append(limit) + cursor.execute(query, params) + columns = ['id', 'config_type', 'config_id', 'old_value', 'new_value', + 'updated_by', 'updated_at'] + return [dict(zip(columns, row)) for row in cursor.fetchall()] + except Exception as e: + logger.error(f"获取配置更新日志失败: {e}") + return [] def get_sqlite_manager() -> SQLiteManager: diff --git a/core/config_sync.py b/core/config_sync.py index 172162a..2fcf6e7 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -383,6 +383,168 @@ class ConfigSyncManager: ), } + def _cache_roi_to_redis(self, roi_config: Dict[str, Any]) -> bool: + """将ROI配置缓存到Redis""" + if not self._redis_client: + return False + + try: + roi_id = roi_config.get("roi_id") + key = f"config:roi:{roi_id}" + + self._redis_client.hset(key, mapping={ + "roi_id": roi_id, + "camera_id": roi_config.get("camera_id", ""), + "roi_type": roi_config.get("roi_type", ""), + "coordinates": str(roi_config.get("coordinates", [])), + "algorithm_type": roi_config.get("algorithm_type", ""), + "working_hours": str(roi_config.get("working_hours", [])), + "confirm_on_duty_sec": str(roi_config.get("confirm_on_duty_sec", 10)), + "confirm_leave_sec": str(roi_config.get("confirm_leave_sec", 10)), + "cooldown_sec": str(roi_config.get("cooldown_sec", 300)), + "target_class": roi_config.get("target_class", "person"), + "enabled": str(roi_config.get("enabled", True)), + }) + + self._redis_client.expire(key, 3600) + logger.debug(f"ROI配置已缓存到Redis: {key}") + return True + except Exception as e: + logger.error(f"缓存ROI配置到Redis失败: {e}") + return False + + def _cache_camera_to_redis(self, camera_config: Dict[str, Any]) -> bool: + """将摄像头配置缓存到Redis""" + if not self._redis_client: + return False + + try: + camera_id = camera_config.get("camera_id") + key = f"config:camera:{camera_id}" + + self._redis_client.hset(key, mapping={ + "camera_id": camera_id, + "rtsp_url": camera_config.get("rtsp_url", ""), + "camera_name": camera_config.get("camera_name", ""), + "enabled": str(camera_config.get("enabled", True)), + "location": camera_config.get("location", ""), + }) + + self._redis_client.expire(key, 3600) + logger.debug(f"摄像头配置已缓存到Redis: {key}") + return True + except Exception as e: + logger.error(f"缓存摄像头配置到Redis失败: {e}") + return False + + def sync_all_to_redis(self) -> int: + """同步所有配置到Redis缓存""" + if not self._redis_client: + return 0 + + count = 0 + try: + cameras = self._db_manager.get_all_camera_configs() + for camera in cameras: + if self._cache_camera_to_redis(camera): + count += 1 + + rois = self._db_manager.get_all_roi_configs() + for roi in rois: + if self._cache_roi_to_redis(roi): + count += 1 + + logger.info(f"已同步 {count} 条配置到Redis缓存") + return count + except Exception as e: + logger.error(f"同步配置到Redis失败: {e}") + return count + + def get_roi_from_redis(self, roi_id: str) -> Optional[Dict[str, Any]]: + """从Redis获取ROI配置""" + if not self._redis_client: + return None + + try: + key = f"config:roi:{roi_id}" + data = self._redis_client.hgetall(key) + if data: + if data.get('coordinates'): + data['coordinates'] = eval(data['coordinates']) + if data.get('working_hours'): + data['working_hours'] = eval(data['working_hours']) + data['confirm_on_duty_sec'] = int(data.get('confirm_on_duty_sec', 10)) + data['confirm_leave_sec'] = int(data.get('confirm_leave_sec', 10)) + data['cooldown_sec'] = int(data.get('cooldown_sec', 300)) + data['enabled'] = data.get('enabled', 'True') == 'True' + return data + return None + except Exception as e: + logger.error(f"从Redis获取ROI配置失败: {e}") + return None + + def get_camera_from_redis(self, camera_id: str) -> Optional[Dict[str, Any]]: + """从Redis获取摄像头配置""" + if not self._redis_client: + return None + + try: + key = f"config:camera:{camera_id}" + data = self._redis_client.hgetall(key) + if data: + data['enabled'] = data.get('enabled', 'True') == 'True' + return data + return None + except Exception as e: + logger.error(f"从Redis获取摄像头配置失败: {e}") + return None + + def notify_config_change(self, config_type: str, config_ids: List[str]): + """通知配置变更(发布到Redis频道)""" + if not self._redis_client: + return + + try: + message = { + "type": config_type, + "ids": config_ids, + "timestamp": datetime.now().isoformat(), + } + self._redis_client.publish("config_update", json.dumps(message)) + logger.info(f"已发布配置变更通知: {config_type} - {config_ids}") + except Exception as e: + logger.error(f"发布配置变更通知失败: {e}") + + def clear_redis_cache(self, config_type: Optional[str] = None): + """清理Redis缓存""" + if not self._redis_client: + return + + try: + if config_type == "roi": + keys = self._redis_client.keys("config:roi:*") + if keys: + self._redis_client.delete(*keys) + elif config_type == "camera": + keys = self._redis_client.keys("config:camera:*") + if keys: + self._redis_client.delete(*keys) + else: + keys = self._redis_client.keys("config:*") + if keys: + self._redis_client.delete(*keys) + logger.info(f"已清理Redis缓存: {config_type or 'all'}") + except Exception as e: + logger.error(f"清理Redis缓存失败: {e}") + + def reload_algorithms(self): + """重新加载所有算法配置""" + self.invalidate_all_cache() + self.clear_redis_cache() + count = self.sync_all_to_redis() + self.notify_config_change("roi", []) + logger.info(f"算法配置已重新加载,更新了 {count} 条缓存") + def close(self): """关闭管理器""" self.stop_config_subscription()