From 0191e498f1872d4579dbf97216c03d51bb183232 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Tue, 10 Feb 2026 15:21:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=91=8A=E8=AD=A6HTTP=E4=B8=8A?= =?UTF-8?q?=E6=8A=A5=20+=20=E6=97=A5=E5=BF=97=E7=B2=BE=E7=AE=80=20+=20?= =?UTF-8?q?=E8=BE=B9=E7=BC=98=E8=8A=82=E7=82=B9=E7=BB=9F=E4=B8=80=E4=B8=BA?= =?UTF-8?q?edge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 alarm_upload_worker.py 异步告警上报(COS+HTTP) - result_reporter 重构为Redis队列模式 - config_sync 适配WVP直推的聚合配置格式 - settings 默认 EDGE_DEVICE_ID 改为 edge - 日志设置非告警模块为WARNING级别减少噪音 - main.py 集成新的告警上报流程 Co-Authored-By: Claude Opus 4.6 --- config/settings.py | 76 ++- core/alarm_upload_worker.py | 412 +++++++++++++ core/config_sync.py | 1114 +++++++++++++++++------------------ core/result_reporter.py | 602 ++++++------------- main.py | 116 ++-- utils/logger.py | 34 ++ 6 files changed, 1279 insertions(+), 1075 deletions(-) create mode 100644 core/alarm_upload_worker.py diff --git a/config/settings.py b/config/settings.py index 6825f3d..499366d 100644 --- a/config/settings.py +++ b/config/settings.py @@ -34,7 +34,7 @@ class SQLiteConfig: @dataclass class RedisConfig: - """Redis配置类""" + """Redis配置类(本地 Redis,边缘侧缓存)""" host: str = "localhost" port: int = 6379 db: int = 0 @@ -43,13 +43,35 @@ class RedisConfig: max_connections: int = 50 +@dataclass +class CloudRedisConfig: + """云端 Redis 配置(三层权威模型 - 云端层)""" + host: str = "localhost" + port: int = 6379 + db: int = 0 + password: Optional[str] = None + decode_responses: bool = True + max_connections: int = 20 + + +@dataclass +class LocalRedisConfig: + """本地 Redis 配置(三层权威模型 - 边缘层缓存)""" + host: str = "localhost" + port: int = 6379 + db: int = 1 + password: Optional[str] = None + decode_responses: bool = True + max_connections: int = 20 + + @dataclass class MQTTConfig: - """MQTT配置类""" + """MQTT配置类(保留配置结构,不再用于告警上报)""" broker_host: str = "localhost" broker_port: int = 1883 client_id: str = "edge_inference_service" - device_id: str = "edge-001" + device_id: str = "edge" username: Optional[str] = None password: Optional[str] = None keepalive: int = 60 @@ -58,6 +80,24 @@ class MQTTConfig: max_reconnect_attempts: int = 10 +@dataclass +class COSConfig: + """腾讯云 COS 配置""" + secret_id: str = "" + secret_key: str = "" + region: str = "ap-beijing" + bucket: str = "" + + +@dataclass +class AlarmUploadConfig: + """告警上报配置""" + cloud_api_url: str = "http://124.221.55.225:8000" + edge_token: str = "" + retry_max: int = 3 + retry_interval: int = 5 + + @dataclass class VideoStreamConfig: """视频流配置类""" @@ -157,15 +197,43 @@ class Settings: port=int(os.getenv("REDIS_PORT", "6379")), password=os.getenv("REDIS_PASSWORD"), ) + + self.cloud_redis = CloudRedisConfig( + host=os.getenv("CLOUD_REDIS_HOST", "localhost"), + port=int(os.getenv("CLOUD_REDIS_PORT", "6379")), + db=int(os.getenv("CLOUD_REDIS_DB", "0")), + password=os.getenv("CLOUD_REDIS_PASSWORD"), + ) + + self.local_redis = LocalRedisConfig( + host=os.getenv("LOCAL_REDIS_HOST", "localhost"), + port=int(os.getenv("LOCAL_REDIS_PORT", "6379")), + db=int(os.getenv("LOCAL_REDIS_DB", "1")), + password=os.getenv("LOCAL_REDIS_PASSWORD"), + ) self.mqtt = MQTTConfig( broker_host=os.getenv("MQTT_BROKER_HOST", "localhost"), broker_port=int(os.getenv("MQTT_BROKER_PORT", "1883")), client_id=os.getenv("MQTT_CLIENT_ID", "edge_inference_service"), - device_id=os.getenv("EDGE_DEVICE_ID", "edge-001"), + device_id=os.getenv("EDGE_DEVICE_ID", "edge"), username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"), ) + + self.cos = COSConfig( + secret_id=os.getenv("COS_SECRET_ID", ""), + secret_key=os.getenv("COS_SECRET_KEY", ""), + region=os.getenv("COS_REGION", "ap-beijing"), + bucket=os.getenv("COS_BUCKET", ""), + ) + + self.alarm_upload = AlarmUploadConfig( + cloud_api_url=os.getenv("CLOUD_API_URL", "http://124.221.55.225:8000"), + edge_token=os.getenv("EDGE_TOKEN", ""), + retry_max=int(os.getenv("ALARM_RETRY_MAX", "3")), + retry_interval=int(os.getenv("ALARM_RETRY_INTERVAL", "5")), + ) self.video_stream = VideoStreamConfig( default_fps=int(os.getenv("VIDEO_DEFAULT_FPS", "5")), diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py new file mode 100644 index 0000000..c661bd4 --- /dev/null +++ b/core/alarm_upload_worker.py @@ -0,0 +1,412 @@ +""" +告警上报 Worker + +独立线程运行,从 Redis 队列消费告警: +1. BRPOP 取待上报告警 +2. 上传截图到腾讯云 COS +3. HTTP POST 上报告警元数据到云端 +4. 失败重试 / 死信队列 + +Redis Key 设计: + local:alarm:pending - 待上报告警队列 + local:alarm:retry - 重试告警队列 + local:alarm:dead - 死信告警队列 +""" + +import json +import logging +import os +import threading +import time +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +import redis +import requests + +from config.settings import get_settings +from core.result_reporter import ( + REDIS_KEY_ALARM_PENDING, + REDIS_KEY_ALARM_RETRY, + REDIS_KEY_ALARM_DEAD, +) + +logger = logging.getLogger(__name__) + + +class AlarmUploadWorker: + """告警上报 Worker + + 在独立线程中运行,从本地 Redis 消费告警数据, + 上传截图到 COS,再 HTTP 上报告警元数据到云端。 + """ + + def __init__(self): + self._settings = get_settings() + self._logger = logging.getLogger("alarm_upload_worker") + + self._redis: Optional[redis.Redis] = None + self._cos_client = None # 懒初始化 + + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + self._stats = { + "processed": 0, + "cos_uploaded": 0, + "http_reported": 0, + "retried": 0, + "dead_lettered": 0, + "errors": 0, + } + + def start(self): + """启动 worker 线程""" + if self._thread and self._thread.is_alive(): + self._logger.warning("AlarmUploadWorker 已在运行") + return + + # 初始化 Redis 连接 + redis_cfg = self._settings.local_redis + try: + self._redis = redis.Redis( + host=redis_cfg.host, + port=redis_cfg.port, + db=redis_cfg.db, + password=redis_cfg.password, + decode_responses=True, + socket_connect_timeout=5, + ) + self._redis.ping() + self._logger.info(f"Worker Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}") + except Exception as e: + self._logger.error(f"Worker Redis 连接失败: {e}") + return + + self._stop_event.clear() + self._thread = threading.Thread( + target=self._worker_loop, + name="AlarmUploadWorker", + daemon=True, + ) + self._thread.start() + self._logger.info("AlarmUploadWorker 已启动") + + def stop(self): + """停止 worker""" + if not self._thread or not self._thread.is_alive(): + return + + self._logger.info("正在停止 AlarmUploadWorker...") + self._stop_event.set() + + # 等待线程退出(BRPOP 有超时,所以不会永远阻塞) + self._thread.join(timeout=10) + + if self._redis: + try: + self._redis.close() + except Exception: + pass + + self._logger.info("AlarmUploadWorker 已停止") + + def _worker_loop(self): + """主循环:消费待上报告警""" + self._logger.info("Worker 主循环开始") + + while not self._stop_event.is_set(): + try: + # 先检查重试队列 + self._process_retry_queue() + + # BRPOP 阻塞等待新告警(超时 2 秒,便于检查 stop 信号) + result = self._redis.brpop(REDIS_KEY_ALARM_PENDING, timeout=2) + if result is None: + continue + + _, alarm_json = result + self._process_alarm(alarm_json) + + except redis.ConnectionError as e: + self._logger.error(f"Redis 连接断开: {e}") + time.sleep(5) + self._reconnect_redis() + except Exception as e: + self._logger.error(f"Worker 主循环异常: {e}") + self._stats["errors"] += 1 + time.sleep(1) + + self._logger.info("Worker 主循环退出") + + def _process_alarm(self, alarm_json: str): + """处理单条告警""" + try: + alarm_data = json.loads(alarm_json) + except json.JSONDecodeError as e: + self._logger.error(f"告警 JSON 解析失败: {e}") + return + + alarm_id = alarm_data.get("alarm_id", "unknown") + retry_count = alarm_data.get("_retry_count", 0) + + self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})") + + # Step 1: 上传截图到 COS + snapshot_local_path = alarm_data.get("snapshot_local_path") + object_key = None + + if snapshot_local_path and os.path.exists(snapshot_local_path): + object_key = self._upload_snapshot_to_cos( + snapshot_local_path, + alarm_id, + alarm_data.get("device_id", "unknown"), + ) + if object_key is None: + # COS 上传失败,进入重试 + self._handle_retry(alarm_json, "COS 上传失败") + return + else: + if snapshot_local_path: + self._logger.warning(f"截图文件不存在: {snapshot_local_path}") + + # Step 2: HTTP 上报告警元数据 + report_data = { + "alarm_id": alarm_data.get("alarm_id"), + "alarm_type": alarm_data.get("alarm_type"), + "device_id": alarm_data.get("device_id"), + "scene_id": alarm_data.get("scene_id"), + "event_time": alarm_data.get("event_time"), + "alarm_level": alarm_data.get("alarm_level"), + "snapshot_url": object_key or "", # COS object_key + "algorithm_code": alarm_data.get("algorithm_code"), + "confidence_score": alarm_data.get("confidence_score"), + "ext_data": alarm_data.get("ext_data", {}), + } + + success = self._report_alarm_http(report_data) + + if success: + self._stats["processed"] += 1 + self._logger.info(f"告警上报成功: {alarm_id}") + + # 可选:删除本地截图 + if snapshot_local_path and os.path.exists(snapshot_local_path): + try: + os.remove(snapshot_local_path) + self._logger.debug(f"已删除本地截图: {snapshot_local_path}") + except Exception as e: + self._logger.warning(f"删除本地截图失败: {e}") + else: + # HTTP 上报失败,进入重试 + self._handle_retry(alarm_json, "HTTP 上报失败") + + def _upload_snapshot_to_cos( + self, local_path: str, alarm_id: str, device_id: str + ) -> Optional[str]: + """ + 上传截图到腾讯云 COS + + Args: + local_path: 本地截图路径 + alarm_id: 告警ID + device_id: 设备ID + + Returns: + COS object_key,失败返回 None + """ + cos_cfg = self._settings.cos + if not cos_cfg.secret_id or not cos_cfg.bucket: + self._logger.warning("COS 未配置,跳过截图上传") + return "" + + # 懒初始化 COS 客户端 + if self._cos_client is None: + try: + from qcloud_cos import CosConfig, CosS3Client + + config = CosConfig( + Region=cos_cfg.region, + SecretId=cos_cfg.secret_id, + SecretKey=cos_cfg.secret_key, + Scheme="https", + ) + self._cos_client = CosS3Client(config) + self._logger.info(f"COS 客户端初始化成功: bucket={cos_cfg.bucket}") + except Exception as e: + self._logger.error(f"COS 客户端初始化失败: {e}") + return None + + # 生成 Object Key: alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg" + + try: + self._cos_client.put_object_from_local_file( + Bucket=cos_cfg.bucket, + LocalFilePath=local_path, + Key=object_key, + ) + self._stats["cos_uploaded"] += 1 + self._logger.info(f"COS 上传成功: {object_key}") + return object_key + + except Exception as e: + self._logger.error(f"COS 上传失败: {e}") + return None + + def _report_alarm_http(self, alarm_data: Dict[str, Any]) -> bool: + """ + HTTP POST 上报告警元数据到云端 + + Args: + alarm_data: 告警元数据 + + Returns: + 是否上报成功 + """ + upload_cfg = self._settings.alarm_upload + url = f"{upload_cfg.cloud_api_url}/admin-api/aiot/alarm/edge/report" + + headers = { + "Content-Type": "application/json", + } + if upload_cfg.edge_token: + headers["Authorization"] = f"Bearer {upload_cfg.edge_token}" + + try: + response = requests.post( + url, + json=alarm_data, + headers=headers, + timeout=10, + ) + + if response.status_code == 200: + body = response.json() + if body.get("code") == 0: + self._stats["http_reported"] += 1 + return True + else: + self._logger.warning( + f"云端返回业务错误: code={body.get('code')}, msg={body.get('msg')}" + ) + return False + else: + self._logger.warning(f"HTTP 上报失败: status={response.status_code}") + return False + + except requests.Timeout: + self._logger.warning(f"HTTP 上报超时: {url}") + return False + except requests.ConnectionError as e: + self._logger.warning(f"HTTP 上报连接失败: {e}") + return False + except Exception as e: + self._logger.error(f"HTTP 上报异常: {e}") + return False + + def _handle_retry(self, alarm_json: str, error: str): + """处理重试逻辑""" + try: + alarm_data = json.loads(alarm_json) + except json.JSONDecodeError: + return + + retry_count = alarm_data.get("_retry_count", 0) + 1 + max_retry = self._settings.alarm_upload.retry_max + + if retry_count > max_retry: + # 超过最大重试次数,写入死信队列 + alarm_data["_dead_reason"] = error + alarm_data["_dead_at"] = datetime.now(timezone.utc).isoformat() + dead_json = json.dumps(alarm_data, ensure_ascii=False) + self._redis.lpush(REDIS_KEY_ALARM_DEAD, dead_json) + self._stats["dead_lettered"] += 1 + self._logger.warning( + f"告警进入死信队列: {alarm_data.get('alarm_id')}, " + f"reason={error}, retries={retry_count - 1}" + ) + return + + # 指数退避:base_interval * 2^(retry_count-1) + base_interval = self._settings.alarm_upload.retry_interval + delay = base_interval * (2 ** (retry_count - 1)) + + alarm_data["_retry_count"] = retry_count + alarm_data["_retry_at"] = ( + datetime.now(timezone.utc).timestamp() + delay + ) + retry_json = json.dumps(alarm_data, ensure_ascii=False) + + self._redis.lpush(REDIS_KEY_ALARM_RETRY, retry_json) + self._stats["retried"] += 1 + + self._logger.info( + f"告警将重试: {alarm_data.get('alarm_id')}, " + f"retry={retry_count}/{max_retry}, delay={delay}s, reason={error}" + ) + + def _process_retry_queue(self): + """检查重试队列中到期的告警""" + if not self._redis: + return + + try: + queue_len = self._redis.llen(REDIS_KEY_ALARM_RETRY) + if queue_len == 0: + return + + now = datetime.now(timezone.utc).timestamp() + + # 逐条检查(FIFO: RPOP 取最早的) + checked = 0 + while checked < queue_len: + item = self._redis.rpop(REDIS_KEY_ALARM_RETRY) + if item is None: + break + + checked += 1 + + try: + data = json.loads(item) + retry_at = data.get("_retry_at", 0) + + if now >= retry_at: + # 到期,放回 pending 队列重新处理 + self._redis.lpush(REDIS_KEY_ALARM_PENDING, item) + self._logger.debug( + f"重试告警回归 pending: {data.get('alarm_id')}" + ) + else: + # 未到期,放回 retry 队列头部 + self._redis.lpush(REDIS_KEY_ALARM_RETRY, item) + except json.JSONDecodeError: + pass + + except Exception as e: + self._logger.error(f"处理重试队列异常: {e}") + + def _reconnect_redis(self): + """重连 Redis""" + redis_cfg = self._settings.local_redis + try: + self._redis = redis.Redis( + host=redis_cfg.host, + port=redis_cfg.port, + db=redis_cfg.db, + password=redis_cfg.password, + decode_responses=True, + socket_connect_timeout=5, + ) + self._redis.ping() + self._logger.info("Redis 重连成功") + except Exception as e: + self._logger.error(f"Redis 重连失败: {e}") + self._redis = None + + def get_statistics(self) -> Dict[str, Any]: + """获取 Worker 统计""" + stats = self._stats.copy() + stats["running"] = self._thread is not None and self._thread.is_alive() + return stats diff --git a/core/config_sync.py b/core/config_sync.py index f88ee4c..2caca4f 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -1,10 +1,13 @@ """ -配置同步模块 -实现 SQLite 本地配置存储、Redis Pub/Sub订阅、配置缓存与动态刷新 +配置同步模块 - 双 Redis 三层权威模型 + +架构: + MySQL(云端权威源) → 云端 Redis(分发 + 版本控制) → 边缘 Redis(本地缓存 + 离线运行) 存储策略: -- SQLite: 本地配置存储(摄像头、ROI) -- Redis: 配置变更实时推送 +- 云端 Redis: 配置分发中枢,通过 Stream 推送变更事件 +- 本地 Redis: 边缘自治核心,算法进程只读本地 Redis +- SQLite: 本地持久化存储(摄像头、ROI 配置) - 内存缓存: 热点配置快速访问 """ @@ -17,28 +20,40 @@ from typing import Any, Callable, Dict, List, Optional, Set import redis from redis import Redis -from redis.client import PubSub -from config.settings import get_settings, RedisConfig +from config.settings import get_settings from config.database import get_sqlite_manager, SQLiteManager -from config.config_models import CameraInfo as CameraInfoModel, ROIInfo, ConfigVersion, ROIInfoNew, ROIAlgoBind +from config.config_models import CameraInfo as CameraInfoModel, ROIInfo, ROIInfoNew, ROIAlgoBind from utils.version_control import get_version_control logger = logging.getLogger(__name__) +# ==================== Redis Key 常量 ==================== + +# 云端 Redis Keys +CLOUD_DEVICE_CONFIG_KEY = "device:{device_id}:config" # 设备最新配置 JSON +CLOUD_DEVICE_VERSION_KEY = "device:{device_id}:version" # 配置版本号 +CLOUD_CONFIG_STREAM = "device_config_stream" # 配置变更 Stream + +# 本地 Redis Keys +LOCAL_CONFIG_CURRENT = "local:device:config:current" # 当前生效配置 +LOCAL_CONFIG_BACKUP = "local:device:config:backup" # 上一次成功配置(回滚用) +LOCAL_CONFIG_VERSION = "local:device:config:version" # 当前版本号 +LOCAL_STREAM_LAST_ID = "local:device:config:stream_last_id" # 上次消费的 Stream ID + + class ConfigCache: """配置缓存管理类""" - + def __init__(self, max_size: int = 1000, ttl: int = 300): self._cache: Dict[str, Any] = {} self._access_times: Dict[str, float] = {} self._max_size = max_size self._ttl = ttl self._lock = threading.RLock() - + def get(self, key: str) -> Optional[Any]: - """从缓存获取配置""" with self._lock: if key in self._cache: access_time = self._access_times.get(key, 0) @@ -48,44 +63,36 @@ class ConfigCache: else: self._delete(key) return None - + def set(self, key: str, value: Any): - """设置配置到缓存""" with self._lock: if len(self._cache) >= self._max_size: self._evict_lru() self._cache[key] = value self._access_times[key] = time.time() - + def delete(self, key: str): - """删除缓存项""" with self._lock: self._delete(key) - + def _delete(self, key: str): - """内部删除方法(不获取锁)""" self._cache.pop(key, None) self._access_times.pop(key, None) - + def _evict_lru(self): - """淘汰最少使用的缓存项""" if not self._access_times: return - min_access_time = min(self._access_times.values()) lru_keys = [k for k, v in self._access_times.items() if v == min_access_time] - for key in lru_keys[:10]: self._delete(key) - + def clear(self): - """清空缓存""" with self._lock: self._cache.clear() self._access_times.clear() - + def get_stats(self) -> Dict[str, Any]: - """获取缓存统计信息""" with self._lock: return { "size": len(self._cache), @@ -95,11 +102,25 @@ class ConfigCache: class ConfigSyncManager: - """配置同步管理器类""" - + """配置同步管理器 - 双 Redis + Stream 模式 + + 启动流程: + 1. 连接本地 Redis,加载上次的配置(保证离线可用) + 2. 连接云端 Redis,拉取最新配置并比对版本 + 3. 启动 Stream 监听线程,持续接收增量变更 + + 配置变更流程: + 1. 收到 Stream 事件(device_id, version, action) + 2. 从云端 Redis 拉取完整配置 + 3. 校验 version > 本地 version + 4. 备份当前配置 → 应用新配置 → 同步到 SQLite + 5. 触发回调通知算法模块热更新 + 6. 失败时回滚到 backup 配置 + """ + _instance = None _lock = threading.Lock() - + def __new__(cls): if cls._instance is None: with cls._lock: @@ -107,349 +128,516 @@ class ConfigSyncManager: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance - + def __init__(self): if self._initialized: return - + settings = get_settings() + self._device_id = settings.mqtt.device_id # 边缘节点 ID self._config_version = settings.config_version self._cache = ConfigCache() - self._redis_client = None - self._redis_pubsub_client = None - self._redis_pubsub = None - self._pubsub_thread = None - self._stop_event = threading.Event() - self._callbacks: Dict[str, Set[Callable]] = {} self._db_manager = None - self._initialized = True - - self._init_redis() + + # 双 Redis 客户端 + self._cloud_redis: Optional[Redis] = None + self._local_redis: Optional[Redis] = None + + # Stream 监听 + self._stream_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._stream_last_id = "0" # 从头开始消费 + + # 配置变更回调 + self._callbacks: Dict[str, Set[Callable]] = {} + self._version_control = get_version_control() - - def _init_redis(self): - """初始化Redis连接""" + self._initialized = True + + self._init_local_redis() + self._init_cloud_redis() + + # ==================== Redis 初始化 ==================== + + def _init_local_redis(self): + """初始化本地 Redis 连接""" try: settings = get_settings() - redis_config = settings.redis + cfg = settings.local_redis + self._local_redis = redis.Redis( + host=cfg.host, + port=cfg.port, + db=cfg.db, + password=cfg.password, + decode_responses=cfg.decode_responses, + socket_connect_timeout=5, + socket_timeout=5, + retry_on_timeout=True, + ) + self._local_redis.ping() + logger.info(f"本地 Redis 连接成功: {cfg.host}:{cfg.port}/{cfg.db}") - self._redis_client = redis.Redis( - host=redis_config.host, - port=redis_config.port, - db=redis_config.db, - password=redis_config.password, - decode_responses=redis_config.decode_responses, + # 恢复上次的 stream last_id + last_id = self._local_redis.get(LOCAL_STREAM_LAST_ID) + if last_id: + self._stream_last_id = last_id + logger.info(f"恢复 Stream 消费位置: {last_id}") + + except Exception as e: + logger.error(f"本地 Redis 连接失败: {e}") + self._local_redis = None + + def _init_cloud_redis(self): + """初始化云端 Redis 连接""" + try: + settings = get_settings() + cfg = settings.cloud_redis + self._cloud_redis = redis.Redis( + host=cfg.host, + port=cfg.port, + db=cfg.db, + password=cfg.password, + decode_responses=cfg.decode_responses, socket_connect_timeout=10, socket_timeout=10, retry_on_timeout=True, ) + self._cloud_redis.ping() + logger.info(f"云端 Redis 连接成功: {cfg.host}:{cfg.port}/{cfg.db}") - # Pub/Sub专用连接,不设socket_timeout,避免listen()周期性超时 - self._redis_pubsub_client = redis.Redis( - host=redis_config.host, - port=redis_config.port, - db=redis_config.db, - password=redis_config.password, - decode_responses=redis_config.decode_responses, - socket_connect_timeout=10, - ) - - self._redis_client.ping() - logger.info(f"Redis连接成功: {redis_config.host}:{redis_config.port}") - except Exception as e: - logger.error(f"Redis连接失败: {e}") - self._redis_client = None - self._redis_pubsub_client = None - + logger.warning(f"云端 Redis 连接失败(将使用本地缓存运行): {e}") + self._cloud_redis = None + def _init_database(self): - """初始化数据库连接""" + """初始化 SQLite 数据库连接""" if self._db_manager is None: self._db_manager = get_sqlite_manager() - + + # ==================== 配置版本与属性 ==================== + @property def config_version(self) -> str: - """获取当前配置版本""" return self._config_version - + + def get_config_version(self) -> int: + """获取当前本地配置版本号""" + if self._local_redis: + try: + ver = self._local_redis.get(LOCAL_CONFIG_VERSION) + return int(ver) if ver else 0 + except Exception: + pass + return 0 + + # ==================== 启动与停止 ==================== + + def start_config_subscription(self): + """启动配置同步 + + 1. 从本地 Redis 加载配置(离线优先) + 2. 从云端同步最新配置(如果可用) + 3. 启动 Stream 监听线程 + """ + # Step 1: 从本地 Redis 加载已有配置到 SQLite + self._load_from_local_redis() + + # Step 2: 尝试从云端拉取最新配置 + if self._cloud_redis: + try: + self._sync_from_cloud() + logger.info("启动时云端配置同步完成") + except Exception as e: + logger.warning(f"启动时云端同步失败(使用本地缓存): {e}") + + # Step 3: 启动 Stream 监听线程 + if self._stream_thread is None or not self._stream_thread.is_alive(): + self._stop_event.clear() + self._stream_thread = threading.Thread( + target=self._listen_config_stream, + name="ConfigStreamListener", + daemon=True + ) + self._stream_thread.start() + logger.info("配置 Stream 监听线程已启动") + + def stop_config_subscription(self): + """停止配置同步""" + self._stop_event.set() + if self._stream_thread and self._stream_thread.is_alive(): + self._stream_thread.join(timeout=5) + logger.info("配置 Stream 监听线程已停止") + + # ==================== Stream 监听 ==================== + + def _listen_config_stream(self): + """监听云端 Redis Stream(带自动重连和指数退避)""" + backoff = 5 # 初始退避秒数 + max_backoff = 60 + + while not self._stop_event.is_set(): + try: + if not self._cloud_redis: + self._init_cloud_redis() + if not self._cloud_redis: + logger.warning(f"云端 Redis 不可用,{backoff}s 后重试...") + self._stop_event.wait(backoff) + backoff = min(backoff * 2, max_backoff) + continue + + # 连接成功,重置退避 + backoff = 5 + + logger.info(f"开始监听 Stream: {CLOUD_CONFIG_STREAM}, last_id={self._stream_last_id}") + + while not self._stop_event.is_set(): + # XREAD BLOCK 5000ms(每 5 秒检查一次 stop_event) + result = self._cloud_redis.xread( + {CLOUD_CONFIG_STREAM: self._stream_last_id}, + count=10, + block=5000, + ) + + if not result: + continue + + for stream_name, messages in result: + for msg_id, msg_data in messages: + try: + self._handle_stream_event(msg_data) + except Exception as e: + logger.error(f"处理 Stream 事件失败: {e}") + + # 更新消费位置 + self._stream_last_id = msg_id + if self._local_redis: + try: + self._local_redis.set(LOCAL_STREAM_LAST_ID, msg_id) + except Exception: + pass + + except redis.ConnectionError as e: + if self._stop_event.is_set(): + return + logger.warning(f"云端 Redis 连接断开: {e}, {backoff}s 后重连...") + self._cloud_redis = None + self._stop_event.wait(backoff) + backoff = min(backoff * 2, max_backoff) + + except Exception as e: + if self._stop_event.is_set(): + return + logger.error(f"Stream 监听异常: {e}, {backoff}s 后重试...") + self._stop_event.wait(backoff) + backoff = min(backoff * 2, max_backoff) + + def _handle_stream_event(self, msg_data: dict): + """处理 Stream 配置变更事件""" + device_id = msg_data.get("device_id", "") + version = int(msg_data.get("version", 0)) + action = msg_data.get("action", "UPDATE") + + # 只处理属于本设备的事件(或广播事件 device_id="") + if device_id and device_id != self._device_id: + return + + logger.info(f"收到配置变更: device={device_id}, version={version}, action={action}") + + local_version = self.get_config_version() + if version <= local_version and action != "ROLLBACK": + logger.info(f"忽略旧版本事件: cloud={version} <= local={local_version}") + return + + if action == "ROLLBACK": + self._rollback_config() + else: + self._sync_from_cloud() + + # ==================== 云端同步 ==================== + + def _sync_from_cloud(self): + """从云端 Redis 拉取最新配置并应用""" + if not self._cloud_redis: + return + + config_key = CLOUD_DEVICE_CONFIG_KEY.format(device_id=self._device_id) + version_key = CLOUD_DEVICE_VERSION_KEY.format(device_id=self._device_id) + + try: + config_json = self._cloud_redis.get(config_key) + cloud_version = self._cloud_redis.get(version_key) + cloud_version = int(cloud_version) if cloud_version else 0 + + if not config_json: + logger.info("云端无此设备配置,跳过同步") + return + + local_version = self.get_config_version() + if cloud_version <= local_version: + logger.info(f"本地配置已是最新: local={local_version}, cloud={cloud_version}") + return + + config_data = json.loads(config_json) + + # 备份当前配置 + self._backup_current_config() + + # 应用新配置 + self._apply_config(config_data, cloud_version) + + logger.info(f"配置同步成功: version {local_version} → {cloud_version}") + + except Exception as e: + logger.error(f"从云端同步配置失败: {e}") + self._rollback_config() + + def _apply_config(self, config_data: dict, version: int): + """应用配置到本地 Redis + SQLite""" + try: + # 1. 写入本地 Redis + if self._local_redis: + self._local_redis.set(LOCAL_CONFIG_CURRENT, json.dumps(config_data, ensure_ascii=False)) + self._local_redis.set(LOCAL_CONFIG_VERSION, str(version)) + + # 2. 同步到 SQLite(推理管线从 SQLite 读取) + self._sync_config_to_sqlite(config_data) + + # 3. 清除内存缓存 + self._cache.clear() + + # 4. 更新版本号 + self._config_version = str(version) + + # 5. 通知回调 + self._notify_callbacks("config_update", { + "type": "full", + "version": version, + "affected_items": ["camera", "roi", "bind"], + }) + + # 6. 记录版本更新 + self._version_control.record_update( + version=str(version), + update_type="配置同步", + description=f"从云端同步配置 v{version}", + updated_by="云端系统", + affected_items=["camera", "roi", "bind"], + details={"source": "cloud_redis"} + ) + + except Exception as e: + logger.error(f"应用配置失败: {e}") + raise + + def _backup_current_config(self): + """备份当前配置到本地 Redis""" + if not self._local_redis: + return + try: + current = self._local_redis.get(LOCAL_CONFIG_CURRENT) + if current: + self._local_redis.set(LOCAL_CONFIG_BACKUP, current) + logger.info("当前配置已备份") + except Exception as e: + logger.warning(f"配置备份失败: {e}") + + def _rollback_config(self): + """回滚到 backup 配置""" + if not self._local_redis: + logger.warning("本地 Redis 不可用,无法回滚") + return + + try: + backup = self._local_redis.get(LOCAL_CONFIG_BACKUP) + if not backup: + logger.warning("无备份配置,无法回滚") + return + + config_data = json.loads(backup) + self._local_redis.set(LOCAL_CONFIG_CURRENT, backup) + self._sync_config_to_sqlite(config_data) + self._cache.clear() + + logger.info("配置已回滚到上一个版本") + + self._notify_callbacks("config_update", { + "type": "rollback", + "affected_items": ["camera", "roi", "bind"], + }) + + except Exception as e: + logger.error(f"配置回滚失败: {e}") + + def _load_from_local_redis(self): + """启动时从本地 Redis 加载配置到 SQLite""" + if not self._local_redis: + logger.info("本地 Redis 不可用,使用 SQLite 现有配置") + return + + try: + config_json = self._local_redis.get(LOCAL_CONFIG_CURRENT) + if config_json: + config_data = json.loads(config_json) + self._sync_config_to_sqlite(config_data) + version = self._local_redis.get(LOCAL_CONFIG_VERSION) + if version: + self._config_version = version + logger.info(f"从本地 Redis 加载配置成功, version={version}") + else: + logger.info("本地 Redis 无缓存配置,使用 SQLite 现有数据") + except Exception as e: + logger.warning(f"从本地 Redis 加载配置失败: {e}") + + # ==================== SQLite 同步 ==================== + + def _sync_config_to_sqlite(self, config_data: dict): + """将配置数据同步到 SQLite""" + self._init_database() + if not self._db_manager: + logger.warning("SQLite 不可用,跳过同步") + return + + count = 0 + try: + # 同步摄像头配置 + cameras = config_data.get("cameras", []) + for cam in cameras: + try: + self._db_manager.save_camera_config( + camera_id=cam.get("camera_id", ""), + rtsp_url=cam.get("rtsp_url", ""), + camera_name=cam.get("camera_name", ""), + enabled=cam.get("enabled", True), + location=cam.get("location", ""), + ) + count += 1 + except Exception as e: + logger.error(f"同步摄像头配置失败: {e}") + + # 同步 ROI 配置 + rois = config_data.get("rois", []) + for roi in rois: + try: + coordinates = roi.get("coordinates", []) + if coordinates and isinstance(coordinates[0], dict): + coordinates = [[p.get("x", 0), p.get("y", 0)] for p in coordinates] + + self._db_manager.save_roi_config( + roi_id=roi.get("roi_id", ""), + camera_id=roi.get("camera_id", ""), + roi_type=roi.get("roi_type", "polygon"), + coordinates=coordinates, + enabled=roi.get("enabled", True), + priority=roi.get("priority", 0), + ) + count += 1 + except Exception as e: + logger.error(f"同步 ROI 配置失败: {e}") + + # 同步算法绑定 + binds = config_data.get("binds", []) + for bind in binds: + try: + self._db_manager.save_roi_algo_bind( + bind_id=bind.get("bind_id", ""), + roi_id=bind.get("roi_id", ""), + algo_code=bind.get("algo_code", ""), + params=bind.get("params", {}), + priority=bind.get("priority", 0), + enabled=bind.get("enabled", True), + ) + count += 1 + except Exception as e: + logger.error(f"同步算法绑定失败: {e}") + + logger.info(f"配置同步到 SQLite 完成: {count} 条记录") + + except Exception as e: + logger.error(f"配置同步到 SQLite 失败: {e}") + + # ==================== 回调管理 ==================== + def register_callback(self, topic: str, callback: Callable): """注册配置变更回调函数""" if topic not in self._callbacks: self._callbacks[topic] = set() self._callbacks[topic].add(callback) logger.info(f"已注册配置变更回调: {topic}") - + def unregister_callback(self, topic: str, callback: Callable): - """注销配置变更回调函数""" if topic in self._callbacks: self._callbacks[topic].discard(callback) - + def _notify_callbacks(self, topic: str, data: Dict[str, Any]): - """触发配置变更回调""" if topic in self._callbacks: for callback in self._callbacks[topic]: try: callback(topic, data) except Exception as e: logger.error(f"配置变更回调执行失败: {e}") - - def _subscribe_config_updates(self): - """订阅配置更新主题(带自动重连)""" - if not self._redis_pubsub_client: - logger.warning("Redis未连接,无法订阅配置更新") - return - while not self._stop_event.is_set(): - try: - self._redis_pubsub = self._redis_pubsub_client.pubsub() - self._redis_pubsub.subscribe("config_update") + # ==================== 配置读取(供推理管线使用) ==================== - logger.info("已订阅config_update主题") - - for message in self._redis_pubsub.listen(): - if self._stop_event.is_set(): - return - - if message["type"] == "message": - try: - data = json.loads(message["data"]) - self._handle_config_update(data) - except Exception as e: - logger.error(f"处理配置更新消息失败: {e}") - - except Exception as e: - if self._stop_event.is_set(): - return - logger.debug(f"配置更新订阅重连中: {e}") - try: - if self._redis_pubsub: - self._redis_pubsub.close() - except Exception: - pass - self._stop_event.wait(3) # 等3秒后重连 - - def _handle_config_update(self, data: Dict[str, Any]): - """处理配置更新消息""" - update_type = data.get("type", "full") - affected_items = data.get("affected_items", []) - version = data.get("version", self._config_version) - - logger.info(f"收到配置更新通知: type={update_type}, items={affected_items}") - - # 从Redis同步最新配置到SQLite(主推理管线从SQLite读取) - self._sync_redis_to_sqlite(affected_items) - - if "camera" in affected_items or "all" in affected_items: - self._cache.delete("cameras") - - if "roi" in affected_items or "all" in affected_items: - self._cache.delete("rois") - - # 清除所有带 camera_id 前缀的 rois_bindings 缓存 - self._cache.clear() - - self._config_version = version - self._notify_callbacks("config_update", data) - - self._version_control.record_update( - version=version, - update_type="配置更新", - description=f"云端配置更新,影响范围: {', '.join(affected_items)}", - updated_by="云端系统", - affected_items=affected_items, - details=data - ) - - def _sync_redis_to_sqlite(self, affected_items: List[str]): - """从Redis同步配置到SQLite,使主推理管线能读取最新配置""" - self._init_database() - if not self._redis_client or not self._db_manager: - logger.warning("Redis或SQLite不可用,跳过同步") - return - - try: - sync_cameras = "camera" in affected_items or "all" in affected_items - sync_rois = "roi" in affected_items or "all" in affected_items - - count = 0 - - if sync_cameras: - camera_keys = self._redis_client.keys("config:camera:*") - for key in (camera_keys or []): - try: - data = self._redis_client.hgetall(key) - if not data or not data.get("camera_id"): - continue - enabled = data.get("enabled", "True") == "True" - self._db_manager.save_camera_config( - camera_id=data["camera_id"], - rtsp_url=data.get("rtsp_url", ""), - camera_name=data.get("camera_name", ""), - enabled=enabled, - location=data.get("location", ""), - ) - count += 1 - except Exception as e: - logger.error(f"同步摄像头配置到SQLite失败: key={key}, error={e}") - - if sync_rois: - # 同步ROI配置 - roi_keys = self._redis_client.keys("config:roi:*") - for key in (roi_keys or []): - try: - data = self._redis_client.hgetall(key) - if not data or not data.get("roi_id"): - continue - raw_coords = eval(data["coordinates"]) if data.get("coordinates") else [] - # 兼容 [{x:0.1, y:0.2},...] 格式,转为 [[x,y],...] - if raw_coords and isinstance(raw_coords[0], dict): - coordinates = [[p.get("x", 0), p.get("y", 0)] for p in raw_coords] - else: - coordinates = raw_coords - enabled = data.get("enabled", "True") == "True" - priority = int(data.get("priority", 0)) - self._db_manager.save_roi_config( - roi_id=data["roi_id"], - camera_id=data.get("camera_id", ""), - roi_type=data.get("roi_type", "polygon"), - coordinates=coordinates, - enabled=enabled, - priority=priority, - ) - count += 1 - except Exception as e: - logger.error(f"同步ROI配置到SQLite失败: key={key}, error={e}") - - # 同步绑定配置 - bind_keys = self._redis_client.keys("config:bind:*") - for key in (bind_keys or []): - try: - data = self._redis_client.hgetall(key) - if not data or not data.get("bind_id"): - continue - params = eval(data["params"]) if data.get("params") else {} - enabled = data.get("enabled", "True") == "True" - priority = int(data.get("priority", 0)) - self._db_manager.save_roi_algo_bind( - bind_id=data["bind_id"], - roi_id=data.get("roi_id", ""), - algo_code=data.get("algo_code", ""), - params=params, - priority=priority, - enabled=enabled, - ) - count += 1 - except Exception as e: - logger.error(f"同步绑定配置到SQLite失败: key={key}, error={e}") - - logger.info(f"Redis→SQLite同步完成: {count} 条配置已更新") - - except Exception as e: - logger.error(f"Redis→SQLite同步失败: {e}") - - def start_config_subscription(self): - """启动配置订阅线程""" - # 启动时先从Redis做一次全量同步(防止服务重启时丢失配置) - if self._redis_client: - try: - logger.info("启动时执行Redis→SQLite全量同步...") - self._sync_redis_to_sqlite(["all"]) - except Exception as e: - logger.warning(f"启动时Redis同步失败(将使用本地SQLite配置): {e}") - - if self._pubsub_thread is None or not self._pubsub_thread.is_alive(): - self._stop_event.clear() - self._pubsub_thread = threading.Thread( - target=self._subscribe_config_updates, - name="ConfigSubscription", - daemon=True - ) - self._pubsub_thread.start() - logger.info("配置订阅线程已启动") - - def stop_config_subscription(self): - """停止配置订阅线程""" - self._stop_event.set() - if self._pubsub_thread and self._pubsub_thread.is_alive(): - self._pubsub_thread.join(timeout=5) - logger.info("配置订阅线程已停止") - def get_cameras(self, force_refresh: bool = False) -> List[CameraInfoModel]: - """获取摄像头配置列表""" + """获取摄像头配置列表(从 SQLite 读取)""" cache_key = "cameras" - if not force_refresh: cached = self._cache.get(cache_key) if cached is not None: return cached - + self._init_database() - if self._db_manager is None: - logger.warning("数据库管理器不可用,返回空摄像头列表") return [] - + try: cameras = self._db_manager.get_all_camera_configs() result = [CameraInfoModel.from_dict(c) for c in cameras] - self._cache.set(cache_key, result) logger.info(f"已加载摄像头配置: {len(result)} 个") return result - except Exception as e: logger.error(f"获取摄像头配置失败: {e}") - cached = self._cache.get(cache_key) - return cached or [] - - def get_roi_configs(self, camera_id: Optional[str] = None, + return self._cache.get(cache_key) or [] + + def get_roi_configs(self, camera_id: Optional[str] = None, force_refresh: bool = False) -> List[ROIInfo]: - """获取ROI配置列表(兼容旧版本)""" + """获取 ROI 配置列表""" cache_key = f"rois_{camera_id}" if camera_id else "rois_all" - if not force_refresh: cached = self._cache.get(cache_key) if cached is not None: return cached - + self._init_database() - if self._db_manager is None: - logger.warning("数据库管理器不可用,返回空ROI配置列表") return [] - + try: if camera_id: roi_configs = self._db_manager.get_rois_by_camera(camera_id) else: roi_configs = self._db_manager.get_all_roi_configs() result = [ROIInfo.from_dict(r) for r in roi_configs] - self._cache.set(cache_key, result) - logger.info(f"已加载ROI配置: {len(result)} 个") return result - except Exception as e: - logger.error(f"获取ROI配置失败: {e}") - cached = self._cache.get(cache_key) - return cached or [] - + logger.error(f"获取 ROI 配置失败: {e}") + return self._cache.get(cache_key) or [] + def get_roi_configs_with_bindings(self, camera_id: Optional[str] = None, force_refresh: bool = False) -> List[ROIInfoNew]: - """获取ROI配置列表(包含算法绑定信息)""" + """获取 ROI 配置列表(包含算法绑定信息)""" cache_key = f"rois_bindings_{camera_id}" if camera_id else "rois_bindings_all" - if not force_refresh: cached = self._cache.get(cache_key) if cached is not None: return cached - + self._init_database() - if self._db_manager is None: - logger.warning("数据库管理器不可用,返回空ROI配置列表") return [] - + try: if camera_id: roi_configs = self._db_manager.get_rois_by_camera(camera_id) @@ -460,343 +648,117 @@ class ConfigSyncManager: for roi in roi_configs: bindings = self._db_manager.get_bindings_by_roi(roi['roi_id']) bindings_list.extend(bindings) - + roi_dict = {r['roi_id']: r for r in roi_configs} - bindings_dict = {} + bindings_dict: Dict[str, list] = {} for b in bindings_list: roi_id = b['roi_id'] if roi_id not in bindings_dict: bindings_dict[roi_id] = [] bindings_dict[roi_id].append(b) - + result = [] for roi_id, roi_data in roi_dict.items(): roi_info = ROIInfoNew.from_dict(roi_data) if roi_id in bindings_dict: roi_info.bindings = [ROIAlgoBind.from_dict(b) for b in bindings_dict[roi_id]] result.append(roi_info) - + result.sort(key=lambda x: x.priority, reverse=True) - self._cache.set(cache_key, result) - logger.info(f"已加载ROI配置(含绑定): {len(result)} 个") + logger.info(f"已加载 ROI 配置(含绑定): {len(result)} 个") return result - + except Exception as e: - logger.error(f"获取ROI配置(含绑定)失败: {e}") - cached = self._cache.get(cache_key) - return cached or [] - + logger.error(f"获取 ROI 配置(含绑定)失败: {e}") + return self._cache.get(cache_key) or [] + def get_camera_rois(self, camera_id: str) -> List[ROIInfo]: - """获取指定摄像头的ROI配置""" return self.get_roi_configs(camera_id=camera_id) - + def get_config_by_id(self, config_type: str, config_id: str) -> Optional[Dict[str, Any]]: - """根据ID获取配置""" self._init_database() - try: if config_type == "camera": - camera = self._db_manager.get_camera_config(config_id) - return camera if camera else None + return self._db_manager.get_camera_config(config_id) elif config_type == "roi": - roi = self._db_manager.get_roi_config(config_id) - return roi if roi else None + return self._db_manager.get_roi_config(config_id) except Exception as e: logger.error(f"获取配置失败: {e}") return None - - def publish_config_update(self, update_data: Dict[str, Any]) -> bool: - """发布配置更新通知""" - if not self._redis_client: - logger.warning("Redis未连接,无法发布配置更新") - return False - + + def get_current_config(self) -> Optional[dict]: + """获取当前完整配置(从本地 Redis)""" + if not self._local_redis: + return None try: - update_data["version"] = self._config_version - update_data["timestamp"] = datetime.now().isoformat() - - self._redis_client.publish("config_update", json.dumps(update_data)) - logger.info(f"已发布配置更新: {update_data}") - return True - + config_json = self._local_redis.get(LOCAL_CONFIG_CURRENT) + return json.loads(config_json) if config_json else None except Exception as e: - logger.error(f"发布配置更新失败: {e}") - return False - + logger.error(f"获取当前配置失败: {e}") + return None + + # ==================== 缓存管理 ==================== + def invalidate_cache(self, cache_key: str): - """使指定缓存失效""" self._cache.delete(cache_key) - logger.info(f"缓存已失效: {cache_key}") - + def invalidate_all_cache(self): - """使所有缓存失效""" self._cache.clear() - logger.info("所有缓存已失效") - + def get_cache_stats(self) -> Dict[str, Any]: - """获取缓存统计信息""" return self._cache.get_stats() - + + # ==================== 健康状态 ==================== + def get_health_status(self) -> Dict[str, Any]: - """获取健康状态""" - redis_healthy = False - if self._redis_client: + cloud_healthy = False + if self._cloud_redis: try: - self._redis_client.ping() - redis_healthy = True + self._cloud_redis.ping() + cloud_healthy = True except Exception: pass - + + local_healthy = False + if self._local_redis: + try: + self._local_redis.ping() + local_healthy = True + except Exception: + pass + return { - "redis_connected": redis_healthy, + "cloud_redis_connected": cloud_healthy, + "local_redis_connected": local_healthy, "config_version": self._config_version, + "local_version": self.get_config_version(), "cache_stats": self.get_cache_stats(), - "subscription_active": ( - self._pubsub_thread is not None and - self._pubsub_thread.is_alive() + "stream_listener_active": ( + self._stream_thread is not None and + self._stream_thread.is_alive() ), } - - def _cache_roi_to_redis(self, roi_config: Dict[str, Any]) -> bool: - """将ROI配置缓存到Redis""" - if not self._redis_client: - return False - - try: - roi_id = roi_config.get("roi_id") - key = f"config:roi:{roi_id}" - - self._redis_client.hset(key, mapping={ - "roi_id": roi_id, - "camera_id": roi_config.get("camera_id", ""), - "roi_type": roi_config.get("roi_type", ""), - "coordinates": str(roi_config.get("coordinates", [])), - "enabled": str(roi_config.get("enabled", True)), - "priority": str(roi_config.get("priority", 0)), - }) - - self._redis_client.expire(key, 3600) - logger.debug(f"ROI配置已缓存到Redis: {key}") - return True - except Exception as e: - logger.error(f"缓存ROI配置到Redis失败: {e}") - return False - - def _cache_algo_bind_to_redis(self, bind_config: Dict[str, Any]) -> bool: - """将ROI算法绑定配置缓存到Redis""" - if not self._redis_client: - return False - - try: - bind_id = bind_config.get("bind_id") - key = f"config:bind:{bind_id}" - - self._redis_client.hset(key, mapping={ - "bind_id": bind_id, - "roi_id": bind_config.get("roi_id", ""), - "algo_code": bind_config.get("algo_code", ""), - "params": str(bind_config.get("params", {})), - "priority": str(bind_config.get("priority", 0)), - "enabled": str(bind_config.get("enabled", True)), - "algo_name": bind_config.get("algo_name", ""), - "target_class": bind_config.get("target_class", "person"), - }) - - self._redis_client.expire(key, 3600) - logger.debug(f"ROI算法绑定配置已缓存到Redis: {key}") - return True - except Exception as e: - logger.error(f"缓存ROI算法绑定配置到Redis失败: {e}") - return False - - def _cache_camera_to_redis(self, camera_config: Dict[str, Any]) -> bool: - """将摄像头配置缓存到Redis""" - if not self._redis_client: - return False - - try: - camera_id = camera_config.get("camera_id") - key = f"config:camera:{camera_id}" - - self._redis_client.hset(key, mapping={ - "camera_id": camera_id, - "rtsp_url": camera_config.get("rtsp_url", ""), - "camera_name": camera_config.get("camera_name", ""), - "enabled": str(camera_config.get("enabled", True)), - "location": camera_config.get("location", ""), - }) - - self._redis_client.expire(key, 3600) - logger.debug(f"摄像头配置已缓存到Redis: {key}") - return True - except Exception as e: - logger.error(f"缓存摄像头配置到Redis失败: {e}") - return False - - def sync_all_to_redis(self) -> int: - """同步所有配置到Redis缓存""" - if not self._redis_client: - return 0 - - count = 0 - try: - cameras = self._db_manager.get_all_camera_configs() - for camera in cameras: - if self._cache_camera_to_redis(camera): - count += 1 - - rois = self._db_manager.get_all_roi_configs() - for roi in rois: - if self._cache_roi_to_redis(roi): - count += 1 - - self.clear_redis_cache("bind") - bindings = self._db_manager.get_bindings_by_camera("") - for bind in bindings: - if self._cache_algo_bind_to_redis(bind): - count += 1 - - logger.info(f"已同步 {count} 条配置到Redis缓存") - return count - except Exception as e: - logger.error(f"同步配置到Redis失败: {e}") - return count - - def get_roi_from_redis(self, roi_id: str) -> Optional[Dict[str, Any]]: - """从Redis获取ROI配置""" - if not self._redis_client: - return None - - try: - key = f"config:roi:{roi_id}" - data = self._redis_client.hgetall(key) - if data: - data['coordinates'] = eval(data['coordinates']) if data.get('coordinates') else [] - data['priority'] = int(data.get('priority', 0)) - data['enabled'] = data.get('enabled', 'True') == 'True' - return data - return None - except Exception as e: - logger.error(f"从Redis获取ROI配置失败: {e}") - return None - - def get_algo_bind_from_redis(self, bind_id: str) -> Optional[Dict[str, Any]]: - """从Redis获取ROI算法绑定配置""" - if not self._redis_client: - return None - - try: - key = f"config:bind:{bind_id}" - data = self._redis_client.hgetall(key) - if data: - data['params'] = eval(data['params']) if data.get('params') else {} - data['priority'] = int(data.get('priority', 0)) - data['enabled'] = data.get('enabled', 'True') == 'True' - data['target_class'] = data.get('target_class', 'person') - return data - return None - except Exception as e: - logger.error(f"从Redis获取ROI算法绑定配置失败: {e}") - return None - - def get_bindings_from_redis(self, roi_id: str) -> List[Dict[str, Any]]: - """从Redis获取ROI的所有算法绑定""" - if not self._redis_client: - return [] - - try: - pattern = "config:bind:*" - keys = self._redis_client.keys(pattern) - results = [] - for key in keys: - data = self._redis_client.hgetall(key) - if data and (not roi_id or data.get('roi_id') == roi_id): - data['params'] = eval(data['params']) if data.get('params') else {} - data['priority'] = int(data.get('priority', 0)) - data['enabled'] = data.get('enabled', 'True') == 'True' - data['target_class'] = data.get('target_class', 'person') - results.append(data) - return sorted(results, key=lambda x: x['priority'], reverse=True) - except Exception as e: - logger.error(f"从Redis获取ROI算法绑定列表失败: {e}") - return [] - - def get_camera_from_redis(self, camera_id: str) -> Optional[Dict[str, Any]]: - """从Redis获取摄像头配置""" - if not self._redis_client: - return None - - try: - key = f"config:camera:{camera_id}" - data = self._redis_client.hgetall(key) - if data: - data['enabled'] = data.get('enabled', 'True') == 'True' - return data - return None - except Exception as e: - logger.error(f"从Redis获取摄像头配置失败: {e}") - return None - - def notify_config_change(self, config_type: str, config_ids: List[str]): - """通知配置变更(发布到Redis频道)""" - if not self._redis_client: - return - - try: - message = { - "type": config_type, - "ids": config_ids, - "timestamp": datetime.now().isoformat(), - } - self._redis_client.publish("config_update", json.dumps(message)) - logger.info(f"已发布配置变更通知: {config_type} - {config_ids}") - except Exception as e: - logger.error(f"发布配置变更通知失败: {e}") - - def clear_redis_cache(self, config_type: Optional[str] = None): - """清理Redis缓存""" - if not self._redis_client: - return - - try: - if config_type == "roi": - keys = self._redis_client.keys("config:roi:*") - if keys: - self._redis_client.delete(*keys) - elif config_type == "camera": - keys = self._redis_client.keys("config:camera:*") - if keys: - self._redis_client.delete(*keys) - elif config_type == "bind": - keys = self._redis_client.keys("config:bind:*") - if keys: - self._redis_client.delete(*keys) - else: - keys = self._redis_client.keys("config:*") - if keys: - self._redis_client.delete(*keys) - logger.info(f"已清理Redis缓存: {config_type or 'all'}") - except Exception as e: - logger.error(f"清理Redis缓存失败: {e}") - - def reload_algorithms(self): - """重新加载所有算法配置""" - self.invalidate_all_cache() - self.clear_redis_cache() - count = self.sync_all_to_redis() - self.notify_config_change("roi", []) - self.notify_config_change("bind", []) - logger.info(f"算法配置已重新加载,更新了 {count} 条缓存") - + + # ==================== 关闭 ==================== + def close(self): - """关闭管理器""" + """关闭管理器,释放所有连接""" self.stop_config_subscription() - if self._redis_client: - if self._redis_pubsub: - self._redis_pubsub.close() - self._redis_client.close() - logger.info("Redis连接已关闭") + + if self._cloud_redis: + try: + self._cloud_redis.close() + except Exception: + pass + logger.info("云端 Redis 连接已关闭") + + if self._local_redis: + try: + self._local_redis.close() + except Exception: + pass + logger.info("本地 Redis 连接已关闭") def get_config_sync_manager() -> ConfigSyncManager: diff --git a/core/result_reporter.py b/core/result_reporter.py index 1a5ef64..b622ec6 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -1,481 +1,219 @@ """ 结果上报模块 -支持 MQTT 推送和本地 SQLite 存储 +使用本地 Redis 缓冲 + HTTP 上报替代 MQTT -存储策略: -- MQTT: 实时推送告警到云端 -- SQLite: 本地异步存储(边缘侧断网容灾) -- 断网时自动缓存到本地,恢复后自动同步 +告警流程: + 算法产出告警 → report_alarm() LPUSH 到 Redis → AlarmUploadWorker 异步消费 """ import json import logging -import threading -import time import uuid -from datetime import datetime -from typing import Any, Dict, List, Optional, Callable +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional from dataclasses import dataclass, field -from pathlib import Path import numpy as np -import paho.mqtt.client as mqtt -from paho.mqtt.client import MQTTMessage +import redis + +from config.settings import get_settings logger = logging.getLogger(__name__) @dataclass -class AlertInfo: - """告警信息类""" - alert_id: str - camera_id: str - roi_id: str - alert_type: str - bind_id: Optional[str] = None - device_id: Optional[str] = None - algorithm: Optional[str] = None - target_class: Optional[str] = None - confidence: Optional[float] = None - bbox: Optional[List[float]] = field(default_factory=list) - message: Optional[str] = None - screenshot: Optional[np.ndarray] = None - timestamp: datetime = field(default_factory=datetime.now) - duration_minutes: Optional[float] = None +class AlarmInfo: + """告警信息类(新格式,对齐云端 alarm_event 表)""" + alarm_id: str + alarm_type: str + device_id: str + scene_id: str + event_time: str # ISO8601 + alarm_level: int # 1-4 + snapshot_local_path: Optional[str] = None + algorithm_code: Optional[str] = None + confidence_score: Optional[float] = None + ext_data: Optional[Dict[str, Any]] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: - """转换为字典(发送到告警平台的 MQTT 消息格式)""" + """转换为字典(写入 Redis 的 JSON 格式)""" return { - "alert_id": self.alert_id, - "camera_id": self.camera_id, - "roi_id": self.roi_id, - "bind_id": self.bind_id, + "alarm_id": self.alarm_id, + "alarm_type": self.alarm_type, "device_id": self.device_id, - "alert_type": self.alert_type, - "algorithm": self.algorithm, - "target_class": self.target_class, - "confidence": self.confidence, - "bbox": self.bbox, - "message": self.message, - "timestamp": self.timestamp.isoformat(), - "duration_minutes": self.duration_minutes, + "scene_id": self.scene_id, + "event_time": self.event_time, + "alarm_level": self.alarm_level, + "snapshot_local_path": self.snapshot_local_path, + "algorithm_code": self.algorithm_code, + "confidence_score": self.confidence_score, + "ext_data": self.ext_data, } +def generate_alarm_id(device_id: str) -> str: + """ + 生成告警ID + 格式: edge_{device_id}_{YYYYMMDDHHmmss}_{6位uuid} + """ + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + uid = uuid.uuid4().hex[:6] + return f"edge_{device_id}_{timestamp}_{uid}" + + +# Redis 队列 Key 常量 +REDIS_KEY_ALARM_PENDING = "local:alarm:pending" +REDIS_KEY_ALARM_RETRY = "local:alarm:retry" +REDIS_KEY_ALARM_DEAD = "local:alarm:dead" + + class ResultReporter: - """结果上报器类""" - - def __init__( - self, - mqtt_broker: str = "localhost", - mqtt_port: int = 1883, - topic_prefix: str = "edge/alert", - mqtt_client_id: str = "edge_reporter" - ): - self._mqtt_broker = mqtt_broker - self._mqtt_port = mqtt_port - self._topic_prefix = topic_prefix - self._mqtt_client_id = mqtt_client_id - - self._client: Optional[mqtt.Client] = None - self._connected = False - self._reconnect_count = 0 - self._lock = threading.Lock() - + """结果上报器 + + 将告警写入本地 Redis 队列,由 AlarmUploadWorker 异步消费上传。 + report_alarm() 方法使用 LPUSH,算法线程零阻塞。 + """ + + def __init__(self): + self._settings = get_settings() + self._redis: Optional[redis.Redis] = None + self._logger = logging.getLogger("result_reporter") + self._performance_stats = { "alerts_generated": 0, - "alerts_sent": 0, - "alerts_stored": 0, - "send_failures": 0, + "alerts_queued": 0, + "queue_failures": 0, } - - self._logger = logging.getLogger("result_reporter") - - self._db_manager = None + + # 图片存储(本地保存截图供 worker 读取上传) self._image_storage = None - self._local_cache = None - - self._logger.info("ResultReporter 初始化完成") - + self._db_manager = None + + self._logger.info("ResultReporter 初始化完成(Redis 缓冲模式)") + def initialize(self): - """初始化存储和MQTT""" - from config.database import SQLiteManager, AlertRecord, get_sqlite_manager - from core.storage_manager import get_image_storage, get_local_cache - - self._logger.info("初始化存储管理器...") - - self._db_manager = get_sqlite_manager() - self._image_storage = get_image_storage() - self._local_cache = get_local_cache() - - self._logger.info("存储管理器初始化完成") - - self._init_mqtt() - - def _init_mqtt(self): - """初始化MQTT客户端""" - self._logger.info(f"正在连接 MQTT: {self._mqtt_broker}:{self._mqtt_port}") + """初始化 Redis 连接和本地存储""" + # 初始化本地存储(截图保存) try: - # 给 client_id 添加随机后缀,防止冲突 - unique_client_id = f"{self._mqtt_client_id}_{uuid.uuid4().hex[:8]}" - - # 兼容不同版本的 paho-mqtt - try: - # paho-mqtt 2.0+ 版本 - self._client = mqtt.Client( - client_id=unique_client_id, - protocol=mqtt.MQTTv311, - callback_api_version=mqtt.CallbackAPIVersion.VERSION2 - ) - except (AttributeError, TypeError): - # paho-mqtt 1.x 版本 - self._client = mqtt.Client( - client_id=unique_client_id, - protocol=mqtt.MQTTv311 - ) - - self._client.on_connect = self._on_connect - self._client.on_disconnect = self._on_disconnect - self._client.on_publish = self._on_publish - - self._client.connect(self._mqtt_broker, self._mqtt_port, 60) - self._client.loop_start() - - self._logger.info(f"MQTT 客户端初始化完成: {self._mqtt_broker}:{self._mqtt_port}") - + from core.storage_manager import get_image_storage + self._image_storage = get_image_storage() except Exception as e: - self._logger.error(f"MQTT 初始化失败: {e}") - import traceback - self._logger.error(traceback.format_exc()) - self._client = None - - def _on_connect(self, client, userdata, flags, reason_code, properties=None): - """MQTT连接回调(兼容 1.x 和 2.x)""" - # paho-mqtt 1.x: rc 是整数,0 表示成功 - # paho-mqtt 2.x: reason_code 是对象,需要检查 .value 或直接比较 - rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code) - if rc == 0: - self._connected = True - self._reconnect_count = 0 - self._logger.info("MQTT 连接成功") - - if self._local_cache: - pending = self._local_cache.get_pending_count() - if pending > 0: - self._logger.info(f"有 {pending} 条待同步的缓存告警") - else: - self._logger.warning(f"MQTT 连接失败: {reason_code}") - - def _on_disconnect(self, client, userdata, reason_code, properties=None): - """MQTT断开连接回调(兼容 1.x 和 2.x)""" - self._connected = False - self._logger.warning(f"MQTT 连接断开: {reason_code}") + self._logger.warning(f"本地图片存储初始化失败: {e}") - def _on_publish(self, client, userdata, mid, reason_code=None, properties=None): - """MQTT发布回调(兼容 1.x 和 2.x)""" - rc = 0 if reason_code is None else (reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0)) - if rc == 0: - self._logger.debug(f"MQTT 消息发布成功: {mid}") - - def report_alert( - self, - alert: AlertInfo, - store_to_db: bool = True, - screenshot: Optional[np.ndarray] = None - ) -> bool: + try: + from config.database import get_sqlite_manager + self._db_manager = get_sqlite_manager() + except Exception as e: + self._logger.warning(f"本地数据库初始化失败: {e}") + + # 初始化 Redis 连接(使用本地 Redis) + redis_cfg = self._settings.local_redis + try: + self._redis = redis.Redis( + host=redis_cfg.host, + port=redis_cfg.port, + db=redis_cfg.db, + password=redis_cfg.password, + decode_responses=True, + socket_connect_timeout=5, + ) + self._redis.ping() + self._logger.info( + f"Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}" + ) + except Exception as e: + self._logger.error(f"Redis 连接失败: {e}") + self._redis = None + + def report_alarm(self, alarm_info: AlarmInfo, screenshot: Optional[np.ndarray] = None) -> bool: """ - 上报告警 - + 上报告警(写入 Redis 队列) + Args: - alert: 告警信息 - store_to_db: 是否存储到本地数据库 - screenshot: 抓拍图片 - + alarm_info: 告警信息 + screenshot: 抓拍图片(numpy 数组) + Returns: - 是否上报成功 + 是否成功写入队列 """ self._performance_stats["alerts_generated"] += 1 - # MQTT 发布和本地存储独立执行,互不影响 - mqtt_ok = False - try: - if self._connected and self._client: - self._publish_alert(alert) - mqtt_ok = True - else: - self._logger.warning("MQTT 未连接,消息已加入待发送队列") - if self._local_cache: - self._local_cache.cache_alert(alert.to_dict()) - except Exception as e: - self._performance_stats["send_failures"] += 1 - self._logger.error(f"MQTT 发布告警失败: {e}") - - try: - if store_to_db and self._db_manager: - self._store_alert(alert, screenshot) - except Exception as e: - self._logger.error(f"本地存储告警失败: {e}") - - if mqtt_ok: - self._performance_stats["alerts_sent"] += 1 - - return mqtt_ok - - def _store_alert(self, alert: AlertInfo, screenshot: Optional[np.ndarray] = None): - """存储告警到本地数据库(异步)""" - image_path = None - + # 保存截图到本地,获取本地路径 if screenshot is not None and self._image_storage: - image_path = self._image_storage.save_capture( - image=screenshot, - camera_id=alert.camera_id, - alert_id=alert.alert_id, - timestamp=alert.timestamp - ) - - record = AlertRecord( - alert_id=alert.alert_id, - camera_id=alert.camera_id, - roi_id=alert.roi_id, - alert_type=alert.alert_type, - target_class=alert.target_class, - confidence=alert.confidence, - bbox=alert.bbox, - message=alert.message, - image_path=image_path, - status="pending", - created_at=alert.timestamp, - duration_minutes=alert.duration_minutes, - ) - - if self._db_manager: - self._db_manager.queue_alert(record) - self._performance_stats["alerts_stored"] += 1 - - def _publish_alert(self, alert: AlertInfo): - """发布告警到MQTT""" - alert_data = alert.to_dict() - - topic = f"{self._topic_prefix}/{alert.camera_id}/{alert.roi_id}" - - result = self._client.publish(topic, json.dumps(alert_data, ensure_ascii=False)) - - if result[0] == mqtt.MQTT_ERR_SUCCESS: - self._logger.info( - f"告警已发布: type={alert.alert_type}, " - f"camera={alert.camera_id}, roi={alert.roi_id}, " - f"confidence={alert.confidence}" - ) - else: - raise Exception(f"MQTT 发布失败: {result[0]}") - - def report_heartbeat( - self, - device_id: str, - status: Dict[str, Any] - ) -> bool: - """上报心跳""" - try: - heartbeat_data = { - "device_id": device_id, - "status": status, - "timestamp": datetime.now().isoformat(), - } - - topic = f"{self._topic_prefix}/heartbeat/{device_id}" - - if self._client and self._connected: - result = self._client.publish(topic, json.dumps(heartbeat_data, ensure_ascii=False)) - if result[0] == mqtt.MQTT_ERR_SUCCESS: - self._logger.debug(f"心跳上报成功: {device_id}") - return True - - self._logger.warning(f"心跳上报失败(MQTT未连接): {device_id}") - return False - except Exception as e: - self._logger.error(f"心跳上报异常: {e}") - return False - - def get_pending_alerts(self) -> List[Dict[str, Any]]: - """获取待同步的告警""" - if self._local_cache: - return self._local_cache.get_pending_alerts() - return [] - - def sync_pending_alerts(self) -> int: - """同步待处理的告警到云端""" - if not self._connected or not self._client: - return 0 - - pending = self.get_pending_alerts() - synced = 0 - - for alert in pending: try: - self._client.publish( - f"{self._topic_prefix}/{alert['camera_id']}/{alert['roi_id']}", - json.dumps(alert, ensure_ascii=False) + local_path = self._image_storage.save_capture( + image=screenshot, + camera_id=alarm_info.device_id, + alert_id=alarm_info.alarm_id, + timestamp=datetime.now(), ) - if self._local_cache: - self._local_cache.remove_cached(alert.get('_cache_id', '')) - synced += 1 + if local_path: + alarm_info.snapshot_local_path = local_path except Exception as e: - self._logger.error(f"同步告警失败: {e}") - - if synced > 0: - self._logger.info(f"已同步 {synced} 条告警到云端") - - return synced - + self._logger.error(f"保存截图失败: {e}") + + # 写入 Redis 队列 + if self._redis is None: + self._performance_stats["queue_failures"] += 1 + self._logger.error("Redis 未连接,无法写入告警队列") + return False + + try: + alarm_json = json.dumps(alarm_info.to_dict(), ensure_ascii=False) + self._redis.lpush(REDIS_KEY_ALARM_PENDING, alarm_json) + self._performance_stats["alerts_queued"] += 1 + + self._logger.info( + f"告警已入队: alarm_id={alarm_info.alarm_id}, " + f"type={alarm_info.alarm_type}, device={alarm_info.device_id}" + ) + return True + + except Exception as e: + self._performance_stats["queue_failures"] += 1 + self._logger.error(f"写入 Redis 队列失败: {e}") + return False + def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" stats = self._performance_stats.copy() - - if self._db_manager: - db_stats = self._db_manager.get_statistics() - stats.update({ - "db_total_alerts": db_stats.get("total_alerts", 0), - "db_pending_alerts": db_stats.get("pending_alerts", 0), - }) - - if self._image_storage: - img_stats = self._image_storage.get_statistics() - stats["image_saved"] = img_stats.get("saved_count", 0) - - if self._local_cache: - stats["pending_sync"] = self._local_cache.get_pending_count() - - stats["mqtt_connected"] = self._connected - + + if self._redis: + try: + stats["pending_count"] = self._redis.llen(REDIS_KEY_ALARM_PENDING) + stats["retry_count"] = self._redis.llen(REDIS_KEY_ALARM_RETRY) + stats["dead_count"] = self._redis.llen(REDIS_KEY_ALARM_DEAD) + except Exception: + pass + + stats["redis_connected"] = self._redis is not None + return stats - - def cleanup(self): - """清理资源""" - self._logger.info("ResultReporter 资源清理") - - if self._image_storage: - self._image_storage.close() - - if self._db_manager: - self._db_manager.close() - - if self._client: - self._client.loop_stop() - self._client.disconnect() - + def close(self): - """关闭上报器(别名)""" - self.cleanup() - - if self._client: - self._client.loop_stop() - self._client.disconnect() - + """关闭上报器""" + self._logger.info("ResultReporter 资源清理") + + if self._image_storage: + try: + self._image_storage.close() + except Exception: + pass + + if self._db_manager: + try: + self._db_manager.close() + except Exception: + pass + + if self._redis: + try: + self._redis.close() + except Exception: + pass + self._logger.info("ResultReporter 清理完成") - -class AlertReporter: - """告警上报器(简化版)""" - - def __init__(self, topic_prefix: str = "edge/alert"): - self._topic_prefix = topic_prefix - self._lock = threading.Lock() - self._performance_stats = { - "alerts_generated": 0, - "alerts_sent": 0, - "alerts_stored": 0, - "send_failures": 0, - } - self._logger = logging.getLogger("alert_reporter") - - def initialize(self): - """初始化""" - from config.database import get_sqlite_manager - from core.storage_manager import get_image_storage - - self._db_manager = get_sqlite_manager() - self._image_storage = get_image_storage() - self._logger.info("AlertReporter 初始化完成") - - def report_alert( - self, - alert: AlertInfo, - store_to_db: bool = True, - screenshot: Optional[np.ndarray] = None - ) -> bool: - """ - 上报告警 - - Args: - alert: 告警信息 - store_to_db: 是否存储到本地数据库 - screenshot: 抓拍图片 - - Returns: - 是否成功 - """ - with self._lock: - self._performance_stats["alerts_generated"] += 1 - - try: - if store_to_db and self._db_manager: - self._store_alert(alert, screenshot) - - self._logger.info( - f"告警已记录: {alert.alert_type} - {alert.camera_id}/{alert.roi_id}" - ) - - with self._lock: - self._performance_stats["alerts_sent"] += 1 - - return True - - except Exception as e: - with self._lock: - self._performance_stats["send_failures"] += 1 - self._logger.error(f"上报告警失败: {e}") - return False - - def _store_alert(self, alert: AlertInfo, screenshot: Optional[np.ndarray] = None): - """存储告警""" - from config.database import AlertRecord, get_sqlite_manager - - image_path = None - - if screenshot is not None and self._image_storage: - image_path = self._image_storage.save_capture( - image=screenshot, - camera_id=alert.camera_id, - alert_id=alert.alert_id, - timestamp=alert.timestamp - ) - - record = AlertRecord( - alert_id=alert.alert_id, - camera_id=alert.camera_id, - roi_id=alert.roi_id, - alert_type=alert.alert_type, - target_class=alert.target_class, - confidence=alert.confidence, - bbox=alert.bbox, - message=alert.message, - image_path=image_path, - status="pending", - created_at=alert.timestamp, - ) - - db_manager = get_sqlite_manager() - db_manager.queue_alert(record) - - with self._lock: - self._performance_stats["alerts_stored"] += 1 - - def get_statistics(self) -> Dict[str, Any]: - """获取统计""" - with self._lock: - return dict(self._performance_stats) - - -def create_alert_reporter(topic_prefix: str = "edge/alert") -> AlertReporter: - """创建告警上报器""" - return AlertReporter(topic_prefix=topic_prefix) + def cleanup(self): + """清理资源(别名)""" + self.close() diff --git a/main.py b/main.py index aa53f65..4ecf002 100644 --- a/main.py +++ b/main.py @@ -19,6 +19,7 @@ from core.preprocessor import ImagePreprocessor from core.tensorrt_engine import TensorRTEngine, EngineManager from core.postprocessor import PostProcessor from core.result_reporter import ResultReporter +from core.alarm_upload_worker import AlarmUploadWorker from algorithms import AlgorithmManager from utils.logger import get_logger, StructuredLogger from utils.version_control import get_version_control @@ -45,6 +46,7 @@ class EdgeInferenceService: self._engine_manager: Optional[EngineManager] = None self._postprocessor: Optional[PostProcessor] = None self._reporter: Optional[ResultReporter] = None + self._alarm_worker: Optional[AlarmUploadWorker] = None self._algorithm_manager: Optional[AlgorithmManager] = None self._processing_threads: Dict[str, threading.Thread] = {} @@ -129,12 +131,22 @@ class EdgeInferenceService: try: self._reporter = ResultReporter() self._logger.info("ResultReporter 对象已创建,准备初始化...") - self._reporter.initialize() # 初始化存储和MQTT连接 + self._reporter.initialize() # 初始化 Redis 连接和本地存储 self._logger.info("结果上报器初始化成功") except Exception as e: self._logger.error(f"结果上报器初始化失败: {e}") import traceback self._logger.error(traceback.format_exc()) + + # 启动告警上报 Worker + try: + self._alarm_worker = AlarmUploadWorker() + self._alarm_worker.start() + self._logger.info("告警上报 Worker 启动成功") + except Exception as e: + self._logger.error(f"告警上报 Worker 启动失败: {e}") + import traceback + self._logger.error(traceback.format_exc()) def _init_algorithm_manager(self): """初始化算法管理器""" @@ -199,9 +211,9 @@ class EdgeInferenceService: try: roi_configs = self._config_manager.get_roi_configs_with_bindings(camera_id) - # 每100帧打印一次状态 + # 每100帧打印一次状态(非告警诊断日志,使用 DEBUG 级别) if self._performance_stats["total_frames_processed"] % 100 == 0: - self._logger.info(f"[{camera_id}] 已处理 {self._performance_stats['total_frames_processed']} 帧, ROI数: {len(roi_configs)}") + self._logger.debug(f"[{camera_id}] 已处理 {self._performance_stats['total_frames_processed']} 帧, ROI数: {len(roi_configs)}") roi_items = [] for roi in roi_configs: @@ -260,13 +272,13 @@ class EdgeInferenceService: # 一次性推理整个 batch outputs, inference_time_ms = engine.infer(batch_data) - # 诊断:输出原始推理结果形状 + # 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别) import numpy as np if isinstance(outputs, np.ndarray): - self._logger.info(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms") + self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms") elif isinstance(outputs, (list, tuple)): shapes = [o.shape if hasattr(o, 'shape') else type(o) for o in outputs] - self._logger.info(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms") + self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms") batch_size = len(roi_items) batch_results = self._postprocessor.batch_process_detections( @@ -276,7 +288,7 @@ class EdgeInferenceService: ) total_detections = sum(len(r[0]) for r in batch_results) - self._logger.info(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}") + self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}") for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(roi_items): boxes, scores, class_ids = batch_results[idx] @@ -350,9 +362,9 @@ class EdgeInferenceService: algo_code = bind.algo_code algo_params = bind.params or {} - # 诊断日志:检测到目标(使用 INFO 级别确保能看到) + # 诊断日志:检测到目标(非告警诊断日志,使用 DEBUG 级别) if len(boxes) > 0: - self._logger.info(f"[{camera_id}] ROI={roi_id[:8]} 检测到 {len(boxes)} 个目标, algo={algo_code}") + self._logger.debug(f"[{camera_id}] ROI={roi_id[:8]} 检测到 {len(boxes)} 个目标, algo={algo_code}") self._algorithm_manager.register_algorithm( roi_id=roi_id, @@ -366,8 +378,8 @@ class EdgeInferenceService: if not tracks: return - # 诊断日志:tracks 内容(INFO 级别) - self._logger.info(f"[{camera_id}] tracks: {[t.get('class') for t in tracks]}, target_class={bind.target_class}") + # 诊断日志:tracks 内容(非告警诊断日志,使用 DEBUG 级别) + self._logger.debug(f"[{camera_id}] tracks: {[t.get('class') for t in tracks]}, target_class={bind.target_class}") alerts = self._algorithm_manager.process( roi_id=roi_id, @@ -382,9 +394,9 @@ class EdgeInferenceService: if alerts: self._logger.info(f"[{camera_id}] 算法 {algo_code} 返回 {len(alerts)} 个告警") else: - # 获取算法状态用于诊断 + # 获取算法状态用于诊断(非告警诊断日志,使用 DEBUG 级别) algo_status = self._algorithm_manager.get_status(roi_id) - self._logger.info(f"[{camera_id}] 算法 {algo_code} 无告警, 状态: {algo_status}") + self._logger.debug(f"[{camera_id}] 算法 {algo_code} 无告警, 状态: {algo_status}") for alert in alerts: alert_type = alert.get("alert_type", "detection") @@ -396,7 +408,7 @@ class EdgeInferenceService: if last_alert_time is not None: elapsed = (now - last_alert_time).total_seconds() if elapsed < self._camera_cooldown_seconds: - self._logger.info( + self._logger.debug( f"[去重] 跳过告警: camera={camera_id}, type={alert_type}, " f"roi={roi_id}, 距上次={elapsed:.1f}s < {self._camera_cooldown_seconds}s" ) @@ -405,23 +417,27 @@ class EdgeInferenceService: self._camera_alert_cooldown[dedup_key] = now self._performance_stats["total_alerts_generated"] += 1 - from core.result_reporter import AlertInfo - alert_info = AlertInfo( - alert_id=f"{roi_id}_{bind.bind_id}_{int(frame.timestamp.timestamp())}", - camera_id=camera_id, - roi_id=roi_id, - bind_id=bind.bind_id, - device_id=self._settings.mqtt.device_id, - alert_type=alert_type, - algorithm=algo_code, - target_class=alert.get("class", bind.target_class or "unknown"), - confidence=alert.get("confidence", 1.0), - bbox=alert.get("bbox", []), - message=alert.get("message", ""), - timestamp=frame.timestamp, - duration_minutes=alert.get("duration_minutes"), + from core.result_reporter import AlarmInfo, generate_alarm_id + alarm_info = AlarmInfo( + alarm_id=generate_alarm_id(self._settings.mqtt.device_id), + alarm_type=alert_type, + device_id=camera_id, + scene_id=roi_id, + event_time=frame.timestamp.isoformat(), + alarm_level=alert.get("alarm_level", 2), + algorithm_code=algo_code, + confidence_score=alert.get("confidence", 1.0), + ext_data={ + "duration_ms": int(alert.get("duration_minutes", 0) * 60 * 1000) if alert.get("duration_minutes") else None, + "roi_id": roi_id, + "bbox": alert.get("bbox", []), + "target_class": alert.get("class", bind.target_class or "unknown"), + "bind_id": bind.bind_id, + "message": alert.get("message", ""), + "edge_node_id": self._settings.mqtt.device_id, + }, ) - self._reporter.report_alert(alert_info, screenshot=frame.image) + self._reporter.report_alarm(alarm_info, screenshot=frame.image) self._logger.info( f"告警已生成: type={alert_type}, " @@ -474,42 +490,13 @@ class EdgeInferenceService: self._logger.info("推理线程已启动") self._stream_manager.start_all() - + self._logger.info("Edge_Inference_Service 已启动") - - self._start_heartbeat_thread() - + self._register_signal_handlers() - + self._wait_for_shutdown() - def _start_heartbeat_thread(self): - """启动心跳线程""" - def heartbeat(): - while not self._stop_event.is_set(): - try: - uptime = (datetime.now() - self._performance_stats["start_time"]).total_seconds() - self._performance_stats["uptime_seconds"] = uptime - - status = { - "running": True, - "uptime_seconds": uptime, - "frames_processed": self._performance_stats["total_frames_processed"], - "alerts_generated": self._performance_stats["total_alerts_generated"], - "stream_stats": self._stream_manager.get_statistics() if self._stream_manager else {}, - } - - if self._reporter: - self._reporter.report_heartbeat(self._settings.mqtt.device_id, status) - - except Exception as e: - self._logger.error(f"心跳上报失败: {e}") - - time.sleep(30) - - thread = threading.Thread(target=heartbeat, name="Heartbeat", daemon=True) - thread.start() - def _register_signal_handlers(self): """注册信号处理器""" def handle_signal(signum, frame): @@ -549,7 +536,10 @@ class EdgeInferenceService: if self._algorithm_manager: self._algorithm_manager.stop_config_subscription() - + + if self._alarm_worker: + self._alarm_worker.stop() + if self._reporter: self._reporter.close() diff --git a/utils/logger.py b/utils/logger.py index ada4e97..e8faa00 100644 --- a/utils/logger.py +++ b/utils/logger.py @@ -84,6 +84,34 @@ class StructuredLogger: self._log_dir = "./logs" self._init_logger() + # 非告警模块列表:这些模块的 INFO/DEBUG 日志会被抑制,只显示 WARNING 及以上 + # 告警相关模块(alarm_upload_worker, result_reporter, main 等)保持原始日志级别 + _QUIET_LOGGERS = [ + # 视频流 / 帧处理 + "core.video_stream", + "video_stream", + "multi_stream", + # 图像预处理 + "core.preprocessor", + "preprocessor", + # 后处理(NMS、坐标映射、告警状态机) + "core.postprocessor", + "postprocessor", + # TensorRT 推理引擎 + "core.tensorrt_engine", + "tensorrt", + # 配置同步 + "core.config_sync", + # 数据库 + "config.database", + # 算法管理器(注册、配置订阅) + "algorithms", + # 图片存储 + "core.storage_manager", + # 版本控制 + "utils.version_control", + ] + def _init_logger(self): """初始化日志配置""" settings = get_settings() @@ -109,6 +137,12 @@ class StructuredLogger: console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) + # 抑制非告警模块的 INFO/DEBUG 日志,只保留 WARNING 及以上 + # 告警相关模块(alarm_upload_worker, result_reporter, main 等)不受影响 + for logger_name in self._QUIET_LOGGERS: + quiet_logger = logging.getLogger(logger_name) + quiet_logger.setLevel(logging.WARNING) + # 配置命名 logger(主模块专用,写入独立日志文件) self._logger = logging.getLogger(self.name) self._logger.setLevel(self._log_level)