From 181623428a3a2cfb656e4647ee3b68f13d070b36 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Wed, 11 Feb 2026 09:57:02 +0800 Subject: [PATCH] =?UTF-8?q?feat(aiot):=20=E5=91=8A=E8=AD=A6=E5=86=B7?= =?UTF-8?q?=E5=8D=B4=E6=97=B6=E9=97=B4=E8=B0=83=E6=95=B4=20+=20=E6=88=AA?= =?UTF-8?q?=E5=9B=BE=E6=9C=AC=E5=9C=B0=E4=BF=9D=E7=95=99=20+=20=E4=B8=AD?= =?UTF-8?q?=E6=96=87=E8=B7=AF=E5=BE=84=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 离岗检测冷却时间: 300s → 600s(10分钟) - 入侵检测冷却时间: 120s → 300s(5分钟) - 入侵告警级别改为高(alarm_level=3) - COS 不可用时保留本地截图文件,不再上报后删除 - 修复 cv2.imwrite 中文路径失败,改用 imencode + write_bytes - 配置订阅在 LOCAL 模式下跳过 Redis 连接 Co-Authored-By: Claude Opus 4.6 --- algorithms.py | 32 ++++-- config/database.py | 4 +- config/settings.py | 46 +++++++- core/alarm_upload_worker.py | 94 ++++++++++++---- core/config_sync.py | 219 +++++++++++++++++++++++++++++++++++- core/preprocessor.py | 26 +++-- core/storage_manager.py | 55 +++++---- main.py | 122 +++++++++++++++++++- 8 files changed, 532 insertions(+), 66 deletions(-) diff --git a/algorithms.py b/algorithms.py index b9afe3e..ef07d0d 100644 --- a/algorithms.py +++ b/algorithms.py @@ -27,7 +27,7 @@ class LeavePostAlgorithm: self, confirm_on_duty_sec: int = 10, confirm_leave_sec: int = 10, - cooldown_sec: int = 300, + cooldown_sec: int = 600, working_hours: Optional[List[Dict]] = None, target_class: Optional[str] = "person", ): @@ -223,7 +223,7 @@ class LeavePostAlgorithm: class IntrusionAlgorithm: def __init__( self, - cooldown_seconds: int = 120, + cooldown_seconds: int = 300, confirm_seconds: int = 5, target_class: Optional[str] = None, ): @@ -297,6 +297,7 @@ class IntrusionAlgorithm: "camera_id": camera_id, "bbox": bbox, "alert_type": "intrusion", + "alarm_level": 3, "message": "检测到周界入侵", }] @@ -395,11 +396,11 @@ class AlgorithmManager: "leave_post": { "confirm_on_duty_sec": 10, "confirm_leave_sec": 10, - "cooldown_sec": 300, + "cooldown_sec": 600, "target_class": "person", }, "intrusion": { - "cooldown_seconds": 120, + "cooldown_seconds": 300, "confirm_seconds": 5, "target_class": None, }, @@ -419,6 +420,9 @@ class AlgorithmManager: try: from config.settings import get_settings settings = get_settings() + if settings.config_sync_mode != "REDIS": + logger.info("CONFIG_SYNC_MODE=LOCAL: 跳过 Redis 配置订阅") + return redis_client = redis.Redis( host=settings.redis.host, port=settings.redis.port, @@ -496,7 +500,17 @@ class AlgorithmManager: with self._update_lock: roi_id = bind_config.get("roi_id") algo_code = bind_config.get("algo_code", "leave_post") - params = bind_config.get("params", {}) + raw_params = bind_config.get("params") + if isinstance(raw_params, str): + try: + import json + params = json.loads(raw_params) or {} + except Exception: + params = {} + elif isinstance(raw_params, dict): + params = raw_params + else: + params = {} if roi_id not in self.algorithms: self.algorithms[roi_id] = {} @@ -507,7 +521,7 @@ class AlgorithmManager: algo_params = { "confirm_on_duty_sec": params.get("confirm_on_duty_sec", 10), "confirm_leave_sec": params.get("confirm_leave_sec", 10), - "cooldown_sec": params.get("cooldown_sec", 300), + "cooldown_sec": params.get("cooldown_sec", 600), "working_hours": params.get("working_hours", []), "target_class": params.get("target_class", bind_config.get("target_class", "person")), } @@ -532,7 +546,7 @@ class AlgorithmManager: logger.info(f"已从Redis加载算法: {key}") elif algo_code == "intrusion": algo_params = { - "cooldown_seconds": params.get("cooldown_seconds", 120), + "cooldown_seconds": params.get("cooldown_seconds", 300), "confirm_seconds": params.get("confirm_seconds", 5), "target_class": params.get("target_class", bind_config.get("target_class")), } @@ -625,13 +639,13 @@ class AlgorithmManager: self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm( confirm_on_duty_sec=algo_params.get("confirm_on_duty_sec", 10), confirm_leave_sec=algo_params.get("confirm_leave_sec", 10), - cooldown_sec=algo_params.get("cooldown_sec", 300), + cooldown_sec=algo_params.get("cooldown_sec", 600), working_hours=roi_working_hours, target_class=algo_params.get("target_class", "person"), ) elif algorithm_type == "intrusion": self.algorithms[roi_id][key]["intrusion"] = IntrusionAlgorithm( - cooldown_seconds=algo_params.get("cooldown_seconds", 120), + cooldown_seconds=algo_params.get("cooldown_seconds", 300), confirm_seconds=algo_params.get("confirm_seconds", 5), target_class=algo_params.get("target_class"), ) diff --git a/config/database.py b/config/database.py index 2698433..3048745 100644 --- a/config/database.py +++ b/config/database.py @@ -266,7 +266,7 @@ class SQLiteManager: 'param_schema': json.dumps({ "confirm_on_duty_sec": {"type": "int", "default": 10, "min": 1}, "confirm_leave_sec": {"type": "int", "default": 10, "min": 1}, - "cooldown_sec": {"type": "int", "default": 300, "min": 0}, + "cooldown_sec": {"type": "int", "default": 600, "min": 0}, "working_hours": {"type": "list", "default": []}, }), 'description': '检测人员是否在岗,支持工作时间段配置' @@ -276,7 +276,7 @@ class SQLiteManager: 'algo_name': '周界入侵检测', 'target_class': 'person', 'param_schema': json.dumps({ - "cooldown_seconds": {"type": "int", "default": 120, "min": 0}, + "cooldown_seconds": {"type": "int", "default": 300, "min": 0}, "confirm_seconds": {"type": "int", "default": 5, "min": 1}, }), 'description': '检测人员进入指定区域,支持确认时间和冷却时间配置' diff --git a/config/settings.py b/config/settings.py index 499366d..61673f6 100644 --- a/config/settings.py +++ b/config/settings.py @@ -92,7 +92,7 @@ class COSConfig: @dataclass class AlarmUploadConfig: """告警上报配置""" - cloud_api_url: str = "http://124.221.55.225:8000" + cloud_api_url: str = "http://localhost:8000" edge_token: str = "" retry_max: int = 3 retry_interval: int = 5 @@ -123,6 +123,18 @@ class InferenceConfig: fp16_mode: bool = True +# ===================== Debug / Local Sync ===================== + +@dataclass +class DebugConfig: + """本地调试相关配置""" + enabled: bool = True + host: str = "127.0.0.1" + port: int = 9001 + reload_signal_file: str = "./config/reload.signal" + local_config_path: str = "./config/local_config.json" + + # COCO 数据集类别名称(YOLO 模型使用) COCO_CLASS_NAMES = [ "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", @@ -177,6 +189,20 @@ class Settings: def _load_env_vars(self): """从环境变量加载配置""" + # 加载 .env 文件(如果 python-dotenv 可用) + try: + from dotenv import load_dotenv + load_dotenv() + except ImportError: + pass + + base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + def _abs_path(path: str) -> str: + if not path: + return path + return path if os.path.isabs(path) else os.path.normpath(os.path.join(base_dir, path)) + self.database = DatabaseConfig( host=os.getenv("DB_HOST", "localhost"), port=int(os.getenv("DB_PORT", "3306")), @@ -186,8 +212,8 @@ class Settings: ) self.sqlite = SQLiteConfig( - db_path=os.getenv("SQLITE_DB_PATH", "./data/security_events.db"), - image_dir=os.getenv("SQLITE_IMAGE_DIR", "./data/captures"), + db_path=_abs_path(os.getenv("SQLITE_DB_PATH", "./data/security_events.db")), + image_dir=_abs_path(os.getenv("SQLITE_IMAGE_DIR", "./data/captures")), retention_days=int(os.getenv("SQLITE_RETENTION_DAYS", "7")), wal_mode=os.getenv("SQLITE_WAL_MODE", "1") == "1", ) @@ -229,11 +255,13 @@ class Settings: ) self.alarm_upload = AlarmUploadConfig( - cloud_api_url=os.getenv("CLOUD_API_URL", "http://124.221.55.225:8000"), + cloud_api_url=os.getenv("CLOUD_API_URL", "http://localhost:8000"), edge_token=os.getenv("EDGE_TOKEN", ""), retry_max=int(os.getenv("ALARM_RETRY_MAX", "3")), retry_interval=int(os.getenv("ALARM_RETRY_INTERVAL", "5")), ) + + self.alarm_upload_enabled = os.getenv("ALARM_UPLOAD_ENABLED", "1") == "1" self.video_stream = VideoStreamConfig( default_fps=int(os.getenv("VIDEO_DEFAULT_FPS", "5")), @@ -248,6 +276,16 @@ class Settings: conf_threshold=float(os.getenv("CONF_THRESHOLD", "0.5")), nms_threshold=float(os.getenv("NMS_THRESHOLD", "0.45")), ) + + self.config_sync_mode = os.getenv("CONFIG_SYNC_MODE", "LOCAL").upper() + + self.debug = DebugConfig( + enabled=os.getenv("DEBUG_SERVER_ENABLED", "1") == "1", + host=os.getenv("DEBUG_SERVER_HOST", "127.0.0.1"), + port=int(os.getenv("DEBUG_SERVER_PORT", "9001")), + reload_signal_file=_abs_path(os.getenv("DEBUG_RELOAD_SIGNAL_FILE", "./config/reload.signal")), + local_config_path=_abs_path(os.getenv("LOCAL_CONFIG_PATH", "./config/local_config.json")), + ) self.log_level = os.getenv("LOG_LEVEL", "INFO") self.log_dir = os.getenv("LOG_DIR", "./logs") diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index c661bd4..d9272f4 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -83,6 +83,9 @@ class AlarmUploadWorker: self._logger.error(f"Worker Redis 连接失败: {e}") return + # 启动时验证云端 API 可达性 + self._check_cloud_api() + self._stop_event.clear() self._thread = threading.Thread( target=self._worker_loop, @@ -92,6 +95,29 @@ class AlarmUploadWorker: self._thread.start() self._logger.info("AlarmUploadWorker 已启动") + def _check_cloud_api(self): + """启动时检查云端 API 是否可达(仅记录日志,不阻断启动)""" + upload_cfg = self._settings.alarm_upload + base_url = upload_cfg.cloud_api_url.rstrip("/") + health_url = f"{base_url}/health" + report_url = f"{base_url}/admin-api/aiot/alarm/edge/report" + + self._logger.info(f"云端 API 地址: {base_url}") + self._logger.info(f"告警上报端点: {report_url}") + + try: + resp = requests.get(health_url, timeout=5) + if resp.status_code == 200: + self._logger.info(f"云端健康检查通过: {health_url}") + else: + self._logger.warning( + f"云端健康检查异常: {health_url}, status={resp.status_code}" + ) + except requests.ConnectionError: + self._logger.warning(f"云端不可达: {health_url},请确认服务已启动") + except Exception as e: + self._logger.warning(f"云端健康检查失败: {e}") + def stop(self): """停止 worker""" if not self._thread or not self._thread.is_alive(): @@ -156,18 +182,32 @@ class AlarmUploadWorker: snapshot_local_path = alarm_data.get("snapshot_local_path") object_key = None - if snapshot_local_path and os.path.exists(snapshot_local_path): - object_key = self._upload_snapshot_to_cos( - snapshot_local_path, - alarm_id, - alarm_data.get("device_id", "unknown"), - ) - if object_key is None: - # COS 上传失败,进入重试 - self._handle_retry(alarm_json, "COS 上传失败") - return - else: - if snapshot_local_path: + if snapshot_local_path: + # 截图是异步保存的,等待文件写入完成(最多 3 秒) + if not os.path.exists(snapshot_local_path): + for _ in range(6): + time.sleep(0.5) + if os.path.exists(snapshot_local_path): + break + + if os.path.exists(snapshot_local_path): + object_key = self._upload_snapshot_to_cos( + snapshot_local_path, + alarm_id, + alarm_data.get("device_id", "unknown"), + ) + if object_key is None: + # COS 上传失败,进入重试 + self._handle_retry(alarm_json, "COS 上传失败") + return + elif object_key == "": + # COS 未配置,使用本地截图路径作为回退 + captures_base = os.path.join("data", "captures") + rel_path = os.path.relpath(snapshot_local_path, captures_base) + rel_path = rel_path.replace("\\", "/") + object_key = f"local:{rel_path}" + self._logger.info(f"使用本地截图路径: {object_key}") + else: self._logger.warning(f"截图文件不存在: {snapshot_local_path}") # Step 2: HTTP 上报告警元数据 @@ -190,8 +230,8 @@ class AlarmUploadWorker: self._stats["processed"] += 1 self._logger.info(f"告警上报成功: {alarm_id}") - # 可选:删除本地截图 - if snapshot_local_path and os.path.exists(snapshot_local_path): + # 仅在 COS 上传成功时删除本地截图;本地回退模式(local:)不删除 + if snapshot_local_path and os.path.exists(snapshot_local_path) and object_key and not object_key.startswith("local:"): try: os.remove(snapshot_local_path) self._logger.debug(f"已删除本地截图: {snapshot_local_path}") @@ -266,7 +306,8 @@ class AlarmUploadWorker: 是否上报成功 """ upload_cfg = self._settings.alarm_upload - url = f"{upload_cfg.cloud_api_url}/admin-api/aiot/alarm/edge/report" + base_url = upload_cfg.cloud_api_url.rstrip("/") + url = f"{base_url}/admin-api/aiot/alarm/edge/report" headers = { "Content-Type": "application/json", @@ -274,10 +315,16 @@ class AlarmUploadWorker: if upload_cfg.edge_token: headers["Authorization"] = f"Bearer {upload_cfg.edge_token}" + # 过滤掉内部字段(以 _ 开头的控制字段不发送到云端) + report_data = {k: v for k, v in alarm_data.items() if not k.startswith("_")} + + self._logger.debug(f"HTTP 上报 URL: {url}") + self._logger.debug(f"HTTP 上报数据: {report_data}") + try: response = requests.post( url, - json=alarm_data, + json=report_data, headers=headers, timeout=10, ) @@ -293,17 +340,26 @@ class AlarmUploadWorker: ) return False else: - self._logger.warning(f"HTTP 上报失败: status={response.status_code}") + # 记录详细的错误信息便于排查 + resp_text = "" + try: + resp_text = response.text[:500] + except Exception: + pass + self._logger.warning( + f"HTTP 上报失败: url={url}, status={response.status_code}, " + f"body={resp_text}" + ) return False except requests.Timeout: self._logger.warning(f"HTTP 上报超时: {url}") return False except requests.ConnectionError as e: - self._logger.warning(f"HTTP 上报连接失败: {e}") + self._logger.warning(f"HTTP 上报连接失败: {url}, error={e}") return False except Exception as e: - self._logger.error(f"HTTP 上报异常: {e}") + self._logger.error(f"HTTP 上报异常: {url}, error={e}") return False def _handle_retry(self, alarm_json: str, error: str): diff --git a/core/config_sync.py b/core/config_sync.py index 2caca4f..d2e4f0c 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -13,6 +13,7 @@ import json import logging +import os import threading import time from datetime import datetime @@ -136,6 +137,7 @@ class ConfigSyncManager: settings = get_settings() self._device_id = settings.mqtt.device_id # 边缘节点 ID self._config_version = settings.config_version + self._sync_mode = settings.config_sync_mode self._cache = ConfigCache() self._db_manager = None @@ -154,8 +156,11 @@ class ConfigSyncManager: self._version_control = get_version_control() self._initialized = True - self._init_local_redis() - self._init_cloud_redis() + if self._sync_mode == "REDIS": + self._init_local_redis() + self._init_cloud_redis() + else: + logger.info("CONFIG_SYNC_MODE=LOCAL: 跳过 Redis 初始化,仅使用本地 SQLite") # ==================== Redis 初始化 ==================== @@ -239,6 +244,10 @@ class ConfigSyncManager: 2. 从云端同步最新配置(如果可用) 3. 启动 Stream 监听线程 """ + if self._sync_mode != "REDIS": + self._log_local_config_snapshot("LOCAL") + return + # Step 1: 从本地 Redis 加载已有配置到 SQLite self._load_from_local_redis() @@ -268,6 +277,24 @@ class ConfigSyncManager: self._stream_thread.join(timeout=5) logger.info("配置 Stream 监听线程已停止") + def _log_local_config_snapshot(self, source: str = "SQLite"): + self._init_database() + if not self._db_manager: + logger.warning(f"[EDGE] Local config snapshot skipped (no SQLite). source={source}") + return + try: + cameras = self._db_manager.get_all_camera_configs() + rois = self._db_manager.get_all_roi_configs() + binds = [] + for roi in rois: + binds.extend(self._db_manager.get_bindings_by_roi(roi["roi_id"])) + logger.info(f"[EDGE] Loading config from local db ({source})...") + logger.info(f"[EDGE] Camera count = {len(cameras)}") + logger.info(f"[EDGE] ROI count = {len(rois)}") + logger.info(f"[EDGE] Algorithm bindings = {len(binds)}") + except Exception as e: + logger.warning(f"[EDGE] Local config snapshot failed: {e}") + # ==================== Stream 监听 ==================== def _listen_config_stream(self): @@ -517,9 +544,20 @@ class ConfigSyncManager: # 同步 ROI 配置 rois = config_data.get("rois", []) for roi in rois: + if not isinstance(roi, dict): + logger.error(f"?? ROI ????: invalid roi item type={type(roi)}") + continue try: coordinates = roi.get("coordinates", []) - if coordinates and isinstance(coordinates[0], dict): + # ?? rectangle dict ? polygon list-of-dict ?? + if isinstance(coordinates, dict): + coordinates = { + "x": coordinates.get("x", 0), + "y": coordinates.get("y", 0), + "w": coordinates.get("w", 0), + "h": coordinates.get("h", 0), + } + elif coordinates and isinstance(coordinates, list) and isinstance(coordinates[0], dict): coordinates = [[p.get("x", 0), p.get("y", 0)] for p in coordinates] self._db_manager.save_roi_config( @@ -698,6 +736,179 @@ class ConfigSyncManager: logger.error(f"获取当前配置失败: {e}") return None + def _get_current_config_from_local_redis(self) -> Optional[dict]: + if not self._local_redis: + return None + try: + config_json = self._local_redis.get(LOCAL_CONFIG_CURRENT) + return json.loads(config_json) if config_json else None + except Exception: + return None + + def get_bindings_from_redis(self, roi_id: str) -> List[Dict[str, Any]]: + """获取 ROI 绑定(LOCAL 模式从 SQLite 读取)""" + if self._sync_mode == "REDIS": + config = self._get_current_config_from_local_redis() + if config: + binds = config.get("binds", []) + if roi_id: + binds = [b for b in binds if b.get("roi_id") == roi_id] + return binds + + self._init_database() + if not self._db_manager: + return [] + if roi_id: + return self._db_manager.get_bindings_by_roi(roi_id) + + binds: List[Dict[str, Any]] = [] + rois = self._db_manager.get_all_roi_configs() + for roi in rois: + binds.extend(self._db_manager.get_bindings_by_roi(roi["roi_id"])) + return binds + + def get_algo_bind_from_redis(self, bind_id: str) -> Optional[Dict[str, Any]]: + """获取单个 bind(LOCAL 模式从 SQLite 读取)""" + if self._sync_mode == "REDIS": + config = self._get_current_config_from_local_redis() + if config: + for bind in config.get("binds", []): + if bind.get("bind_id") == bind_id: + return bind + + self._init_database() + if not self._db_manager: + return None + return self._db_manager.get_roi_algo_bind(bind_id) + + def reload_local_config_from_file(self) -> bool: + """本地调试:从 JSON 文件读取配置并同步到 SQLite""" + settings = get_settings() + config_path = settings.debug.local_config_path + try: + if not os.path.exists(config_path): + logger.warning(f"本地配置文件不存在: {config_path}") + return False + with open(config_path, "r", encoding="utf-8") as f: + config_data = json.load(f) + return self.reload_local_config(config_data, source="FILE") + except Exception as e: + logger.error(f"本地配置文件加载失败: {e}") + return False + + def _clear_rois_for_camera_ids(self, camera_ids: List[str]): + if not camera_ids: + return + self._init_database() + if not self._db_manager: + return + for camera_id in camera_ids: + rois = self._db_manager.get_rois_by_camera(camera_id) + for roi in rois: + roi_id = roi.get("roi_id") + if roi_id: + self._db_manager.delete_bindings_by_roi(roi_id) + self._db_manager.delete_roi_config(roi_id) + + def reload_local_config(self, config_data: dict, source: str = "LOCAL") -> bool: + # ?????????????? camelCase key + # ???? data ?? + if isinstance(config_data, dict) and isinstance(config_data.get("data"), dict): + config_data = config_data.get("data") + if not config_data: + logger.warning(f"[EDGE] Empty config payload, source={source}") + return False + try: + rois = (config_data.get("rois") or config_data.get("roiConfigs") or config_data.get("roi_list") or []) + binds = (config_data.get("binds") or config_data.get("roiAlgoBinds") or config_data.get("algoBinds") or config_data.get("bindings") or []) + cams = (config_data.get("cameras") or config_data.get("cameraList") or config_data.get("camera_list") or []) + logger.info("[EDGE] Incoming payload: cameras=%s rois=%s binds=%s source=%s", + len(cams) if isinstance(cams, list) else 0, + len(rois) if isinstance(rois, list) else 0, + len(binds) if isinstance(binds, list) else 0, + source) + except Exception: + pass + + rois = config_data.get("rois") or config_data.get("roiConfigs") or config_data.get("roi_list") + if isinstance(rois, list): + norm_rois = [] + for r in rois: + if not isinstance(r, dict): + norm_rois.append(r) + continue + if "roi_id" not in r and "roiId" in r: + r["roi_id"] = r.get("roiId") + if "camera_id" not in r and "cameraId" in r: + r["camera_id"] = r.get("cameraId") + if "roi_type" not in r and "roiType" in r: + r["roi_type"] = r.get("roiType") + norm_rois.append(r) + config_data["rois"] = norm_rois + + binds = config_data.get("binds") or config_data.get("roiAlgoBinds") or config_data.get("algoBinds") or config_data.get("bindings") + if isinstance(binds, list): + norm_binds = [] + for b in binds: + if not isinstance(b, dict): + norm_binds.append(b) + continue + if "bind_id" not in b and "bindId" in b: + b["bind_id"] = b.get("bindId") + if "roi_id" not in b and "roiId" in b: + b["roi_id"] = b.get("roiId") + if "algo_code" not in b and "algoCode" in b: + b["algo_code"] = b.get("algoCode") + norm_binds.append(b) + config_data["binds"] = norm_binds + # 本地调试:从内存配置同步到 SQLite(支持覆盖式更新) + try: + camera_ids: List[str] = [] + for cam in config_data.get("cameras", []) or []: + cid = cam.get("camera_id") + if cid: + camera_ids.append(cid) + for cid in config_data.get("camera_ids", []) or []: + if cid: + camera_ids.append(cid) + for roi in config_data.get("rois", []) or []: + cid = roi.get("camera_id") + if cid: + camera_ids.append(cid) + + incoming_ids = set(camera_ids) + + if camera_ids: + self._clear_rois_for_camera_ids(list(incoming_ids)) + + # 仅全量推送时,清除不在本次推送中的旧摄像头 + # sync_mode="full" 由 push-all 设置;单摄像头推送不带此标志,不清理 + sync_mode = config_data.get("sync_mode", "partial") + if sync_mode == "full": + self._init_database() + if self._db_manager and incoming_ids: + try: + existing = self._db_manager.get_all_camera_configs() + for cam in existing: + old_id = cam.get("camera_id") + if old_id and old_id not in incoming_ids: + self._clear_rois_for_camera_ids([old_id]) + self._db_manager.delete_camera_config(old_id) + logger.info(f"[EDGE] 清除不在推送列表中的旧摄像头: {old_id}") + except Exception as e: + logger.warning(f"[EDGE] 清理旧摄像头失败: {e}") + else: + logger.info(f"[EDGE] 增量推送 (sync_mode={sync_mode}),跳过旧摄像头清理") + + version = int(time.time()) + self._apply_config(config_data, version) + self.invalidate_all_cache() + self._log_local_config_snapshot(source) + return True + except Exception as e: + logger.error(f"本地配置同步失败: {e}") + return False + # ==================== 缓存管理 ==================== def invalidate_cache(self, cache_key: str): @@ -763,4 +974,4 @@ class ConfigSyncManager: def get_config_sync_manager() -> ConfigSyncManager: """获取配置同步管理器单例""" - return ConfigSyncManager() + return ConfigSyncManager() \ No newline at end of file diff --git a/core/preprocessor.py b/core/preprocessor.py index 62760e5..5571b70 100644 --- a/core/preprocessor.py +++ b/core/preprocessor.py @@ -58,14 +58,26 @@ class ROICropper: def _crop_rectangle( self, image: np.ndarray, - coordinates: List[List[float]] + coordinates: Union[List[List[float]], Dict[str, float]] ) -> Optional[np.ndarray]: - """裁剪矩形区域""" - if len(coordinates) < 2: - return None - - x1, y1 = int(coordinates[0][0]), int(coordinates[0][1]) - x2, y2 = int(coordinates[1][0]), int(coordinates[1][1]) + """裁剪矩形区域 + + 支持两种坐标格式: + 1. dict: {"x": float, "y": float, "w": float, "h": float} — 归一化坐标(0-1) + 2. list: [[x1,y1],[x2,y2]] — 像素坐标 + """ + img_h, img_w = image.shape[:2] + + if isinstance(coordinates, dict): + x1 = int(coordinates["x"] * img_w) + y1 = int(coordinates["y"] * img_h) + x2 = int((coordinates["x"] + coordinates["w"]) * img_w) + y2 = int((coordinates["y"] + coordinates["h"]) * img_h) + else: + if len(coordinates) < 2: + return None + x1, y1 = int(coordinates[0][0]), int(coordinates[0][1]) + x2, y2 = int(coordinates[1][0]), int(coordinates[1][1]) x1 = max(0, min(x1, image.shape[1] - 1)) y1 = max(0, min(y1, image.shape[0] - 1)) diff --git a/core/storage_manager.py b/core/storage_manager.py index 4d7e3d0..c7d41d4 100644 --- a/core/storage_manager.py +++ b/core/storage_manager.py @@ -44,9 +44,14 @@ class PendingCapture: class ImageStorageManager: """图片存储管理器""" - + _instance = None _lock = threading.Lock() + + @staticmethod + def _sanitize_filename(name: str) -> str: + """清理文件名中的非法字符(/ \\ 等路径分隔符替换为下划线)""" + return name.replace("/", "_").replace("\\", "_") def __new__(cls, config: Optional[CaptureConfig] = None): if cls._instance is None: @@ -103,17 +108,14 @@ class ImageStorageManager: logger.error(f"图片保存异常: {e}") def _save_image(self, capture: PendingCapture) -> Optional[str]: - """保存单张图片""" + """保存单张图片(使用 imencode+write_bytes 避免中文路径问题)""" try: image = capture.image - + if image is None: self._failed_count += 1 return None - - if len(image.shape) == 3 and image.shape[2] == 3: - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - + if image.shape[1] > self.config.max_width or image.shape[0] > self.config.max_height: scale = min( self.config.max_width / image.shape[1], @@ -124,28 +126,33 @@ class ImageStorageManager: int(image.shape[0] * scale) ) image = cv2.resize(image, new_size, interpolation=cv2.INTER_AREA) - + date_dir = capture.timestamp.strftime("%Y%m%d") save_dir = Path(self.config.image_dir) / date_dir save_dir.mkdir(parents=True, exist_ok=True) - - filename = f"{capture.camera_id}_{capture.alert_id}{self.config.save_format}" + + safe_camera_id = self._sanitize_filename(capture.camera_id) + filename = f"{safe_camera_id}_{capture.alert_id}{self.config.save_format}" filepath = save_dir / filename - - success = cv2.imwrite( - str(filepath), + + # 使用 imencode + write_bytes 代替 imwrite, + # 因为 cv2.imwrite 在 Windows 上无法处理中文路径 + success, buffer = cv2.imencode( + self.config.save_format, image, [cv2.IMWRITE_JPEG_QUALITY, self.config.quality] ) - + if success: + filepath.write_bytes(buffer.tobytes()) self._saved_count += 1 - logger.debug(f"图片已保存: {filepath}") + logger.info(f"图片已保存: {filepath}") return str(filepath) else: + logger.warning(f"图片编码失败: {filepath}") self._failed_count += 1 return None - + except Exception as e: logger.error(f"保存图片失败: {e}") self._failed_count += 1 @@ -158,20 +165,28 @@ class ImageStorageManager: alert_id: str, timestamp: Optional[datetime] = None ) -> Optional[str]: - """异步保存抓拍图片""" + """异步保存抓拍图片,返回预计的文件路径""" + ts = timestamp or datetime.now() capture = PendingCapture( image=image, camera_id=camera_id, alert_id=alert_id, - timestamp=timestamp or datetime.now() + timestamp=ts, ) self._save_queue.put(capture) - return f"" + + # 返回确定性的文件路径(与 _save_image 使用相同的命名规则) + date_dir = ts.strftime("%Y%m%d") + safe_camera_id = self._sanitize_filename(camera_id) + filename = f"{safe_camera_id}_{alert_id}{self.config.save_format}" + filepath = Path(self.config.image_dir) / date_dir / filename + return str(filepath) def get_image_path(self, camera_id: str, alert_id: str) -> Optional[str]: """获取已保存图片路径""" date_str = datetime.now().strftime("%Y%m%d") - filename = f"{camera_id}_{alert_id}{self.config.save_format}" + safe_camera_id = self._sanitize_filename(camera_id) + filename = f"{safe_camera_id}_{alert_id}{self.config.save_format}" filepath = Path(self.config.image_dir) / date_str / filename if filepath.exists(): diff --git a/main.py b/main.py index 4ecf002..47a384f 100644 --- a/main.py +++ b/main.py @@ -14,6 +14,7 @@ from typing import Dict, Any, Optional, List, Tuple from config.settings import get_settings, Settings from core.config_sync import get_config_sync_manager, ConfigSyncManager +from core.debug_http_server import start_debug_http_server from core.video_stream import MultiStreamManager, VideoFrame from core.preprocessor import ImagePreprocessor from core.tensorrt_engine import TensorRTEngine, EngineManager @@ -48,6 +49,9 @@ class EdgeInferenceService: self._reporter: Optional[ResultReporter] = None self._alarm_worker: Optional[AlarmUploadWorker] = None self._algorithm_manager: Optional[AlgorithmManager] = None + self._debug_reload_thread: Optional[threading.Thread] = None + self._debug_http_server = None + self._debug_http_thread: Optional[threading.Thread] = None self._processing_threads: Dict[str, threading.Thread] = {} self._stop_event = threading.Event() @@ -90,6 +94,13 @@ class EdgeInferenceService: try: self._config_manager = get_config_sync_manager() self._config_manager.start_config_subscription() + if self._settings.config_sync_mode == "LOCAL" and self._config_manager: + def _on_config_update(topic, data): + if self._algorithm_manager: + self._algorithm_manager.reload_all_algorithms() + # 配置更新后动态加载新摄像头流 + self._reload_cameras() + self._config_manager.register_callback("config_update", _on_config_update) self._logger.info("配置管理器初始化成功") except Exception as e: self._logger.error(f"配置管理器初始化失败: {e}") @@ -127,6 +138,9 @@ class EdgeInferenceService: self._logger.info("后处理器初始化成功") def _init_reporter(self): + if self._settings.config_sync_mode == "LOCAL" and not self._settings.alarm_upload_enabled: + self._logger.info("LOCAL 模式且 ALARM_UPLOAD_ENABLED=0:跳过告警上报组件初始化") + return """初始化结果上报器""" try: self._reporter = ResultReporter() @@ -157,6 +171,70 @@ class EdgeInferenceService: except Exception as e: self._logger.error(f"算法管理器初始化失败: {e}") + def _start_debug_reload_watcher(self): + """本地调试:监听文件触发同步""" + if self._settings.config_sync_mode != "LOCAL": + return + if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled: + return + if not self._config_manager: + return + + signal_file = self._settings.debug.reload_signal_file + + def worker(): + last_mtime = None + self._logger.info(f"[DEBUG] 本地同步模式已启用,监听: {signal_file}") + while not self._stop_event.is_set(): + try: + if os.path.exists(signal_file): + mtime = os.path.getmtime(signal_file) + if last_mtime is None: + last_mtime = mtime + elif mtime != last_mtime: + last_mtime = mtime + ok = self._config_manager.reload_local_config_from_file() + if self._algorithm_manager: + self._algorithm_manager.reload_all_algorithms() + self._logger.info(f"[DEBUG] 本地配置已重新加载: {ok}") + time.sleep(1.0) + except Exception as e: + self._logger.warning(f"[DEBUG] 监听本地配置失败: {e}") + time.sleep(1.0) + + self._debug_reload_thread = threading.Thread( + target=worker, + name="LocalConfigReloadWatcher", + daemon=True, + ) + self._debug_reload_thread.start() + + def _start_debug_http_server(self): + """本地调试:启动 HTTP 同步接口""" + if self._settings.config_sync_mode != "LOCAL": + return + if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled: + return + if self._debug_http_server is not None: + return + + host = self._settings.debug.host + port = self._settings.debug.port + self._debug_http_server = start_debug_http_server(host, port) + + def worker(): + try: + self._debug_http_server.serve_forever() + except Exception as e: + self._logger.warning(f"[DEBUG] HTTP 服务器异常: {e}") + + self._debug_http_thread = threading.Thread( + target=worker, + name="DebugHttpServer", + daemon=True, + ) + self._debug_http_thread.start() + def initialize(self): """初始化所有组件""" self._logger.info("=" * 50) @@ -171,6 +249,8 @@ class EdgeInferenceService: self._init_postprocessor() self._init_reporter() self._init_algorithm_manager() + self._start_debug_reload_watcher() + self._start_debug_http_server() self._performance_stats["start_time"] = datetime.now() @@ -187,7 +267,7 @@ class EdgeInferenceService: def _load_cameras(self): """加载摄像头配置""" cameras = self._config_manager.get_cameras() - + for camera in cameras: try: self._stream_manager.add_stream( @@ -199,6 +279,37 @@ class EdgeInferenceService: self._logger.info(f"已添加摄像头: {camera.camera_id}") except Exception as e: self._logger.error(f"添加摄像头失败 {camera.camera_id}: {e}") + + def _reload_cameras(self): + """配置更新后动态加载新摄像头(不重复添加已有的)""" + if not self._stream_manager or not self._config_manager: + return + try: + cameras = self._config_manager.get_cameras(force_refresh=True) + existing = set(self._stream_manager._streams.keys()) + added = 0 + for camera in cameras: + if camera.camera_id in existing: + continue + if not camera.rtsp_url: + self._logger.warning(f"摄像头 {camera.camera_id} 无 rtsp_url,跳过") + continue + try: + self._stream_manager.add_stream( + camera_id=camera.camera_id, + rtsp_url=camera.rtsp_url, + target_fps=self._settings.video_stream.default_fps, + on_frame_callback=self._create_frame_callback(camera.camera_id) + ) + self._stream_manager._streams[camera.camera_id].start() + added += 1 + self._logger.info(f"动态添加并启动摄像头: {camera.camera_id}") + except Exception as e: + self._logger.error(f"动态添加摄像头失败 {camera.camera_id}: {e}") + if added > 0: + self._logger.info(f"配置更新后新增 {added} 个摄像头流") + except Exception as e: + self._logger.error(f"动态加载摄像头失败: {e}") def _create_frame_callback(self, camera_id: str): """创建帧处理回调""" @@ -358,6 +469,9 @@ class EdgeInferenceService: self._logger.warning("算法管理器不可用,跳过算法处理") return + if self._reporter is None: + self._logger.debug("ResultReporter 未启用,跳过告警上报") + return roi_id = roi.roi_id algo_code = bind.algo_code algo_params = bind.params or {} @@ -542,6 +656,12 @@ class EdgeInferenceService: if self._reporter: self._reporter.close() + + if self._debug_http_server: + try: + self._debug_http_server.shutdown() + except Exception: + pass self._performance_stats["uptime_seconds"] = ( (datetime.now() - self._performance_stats["start_time"]).total_seconds()