""" 告警上报 Worker 独立线程运行,从 Redis 队列消费告警: 1. BRPOP 取待上报告警 2. 上传截图到腾讯云 COS 3. HTTP POST 上报告警元数据到云端 4. 失败重试 / 死信队列 Redis Key 设计: local:alarm:pending - 待上报告警队列 local:alarm:retry - 重试告警队列 local:alarm:dead - 死信告警队列 """ import json import logging import os import threading import time from datetime import datetime, timezone from typing import Any, Dict, Optional import redis import requests from config.settings import get_settings from core.result_reporter import ( REDIS_KEY_ALARM_PENDING, REDIS_KEY_ALARM_RETRY, REDIS_KEY_ALARM_DEAD, ) logger = logging.getLogger(__name__) class AlarmUploadWorker: """告警上报 Worker 在独立线程中运行,从本地 Redis 消费告警数据, 上传截图到 COS,再 HTTP 上报告警元数据到云端。 """ def __init__(self): self._settings = get_settings() self._logger = logging.getLogger("alarm_upload_worker") self._redis: Optional[redis.Redis] = None self._cos_client = None # 懒初始化 self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._stats = { "processed": 0, "cos_uploaded": 0, "http_reported": 0, "retried": 0, "dead_lettered": 0, "errors": 0, } def start(self): """启动 worker 线程""" if self._thread and self._thread.is_alive(): self._logger.warning("AlarmUploadWorker 已在运行") return # 初始化 Redis 连接 redis_cfg = self._settings.local_redis try: self._redis = redis.Redis( host=redis_cfg.host, port=redis_cfg.port, db=redis_cfg.db, password=redis_cfg.password, decode_responses=True, socket_connect_timeout=5, ) self._redis.ping() self._logger.info(f"Worker Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}") except Exception as e: self._logger.error(f"Worker Redis 连接失败: {e}") return # 启动时验证云端 API 可达性 self._check_cloud_api() self._stop_event.clear() self._thread = threading.Thread( target=self._worker_loop, name="AlarmUploadWorker", daemon=True, ) self._thread.start() self._logger.info("AlarmUploadWorker 已启动") def _check_cloud_api(self): """启动时检查云端 API 是否可达(仅记录日志,不阻断启动)""" upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") health_url = f"{base_url}/health" report_url = f"{base_url}/admin-api/aiot/alarm/edge/report" self._logger.info(f"云端 API 地址: {base_url}") self._logger.info(f"告警上报端点: {report_url}") try: resp = requests.get(health_url, timeout=5) if resp.status_code == 200: self._logger.info(f"云端健康检查通过: {health_url}") else: self._logger.warning( f"云端健康检查异常: {health_url}, status={resp.status_code}" ) except requests.ConnectionError: self._logger.warning(f"云端不可达: {health_url},请确认服务已启动") except Exception as e: self._logger.warning(f"云端健康检查失败: {e}") def stop(self): """停止 worker""" if not self._thread or not self._thread.is_alive(): return self._logger.info("正在停止 AlarmUploadWorker...") self._stop_event.set() # 等待线程退出(BRPOP 有超时,所以不会永远阻塞) self._thread.join(timeout=10) if self._redis: try: self._redis.close() except Exception: pass self._logger.info("AlarmUploadWorker 已停止") def _worker_loop(self): """主循环:消费待上报告警""" self._logger.info("Worker 主循环开始") while not self._stop_event.is_set(): try: # 先检查重试队列 self._process_retry_queue() # BRPOP 阻塞等待新告警(超时 2 秒,便于检查 stop 信号) result = self._redis.brpop(REDIS_KEY_ALARM_PENDING, timeout=2) if result is None: continue _, alarm_json = result self._process_alarm(alarm_json) except redis.ConnectionError as e: self._logger.error(f"Redis 连接断开: {e}") time.sleep(5) self._reconnect_redis() except Exception as e: self._logger.error(f"Worker 主循环异常: {e}") self._stats["errors"] += 1 time.sleep(1) self._logger.info("Worker 主循环退出") def _process_alarm(self, alarm_json: str): """处理单条告警""" try: alarm_data = json.loads(alarm_json) except json.JSONDecodeError as e: self._logger.error(f"告警 JSON 解析失败: {e}") return # 如果是 resolve 事件,走单独的处理逻辑 if alarm_data.get("_type") == "resolve": self._process_resolve(alarm_data) return alarm_id = alarm_data.get("alarm_id", "unknown") retry_count = alarm_data.get("_retry_count", 0) self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})") # Step 1: 上传截图到 COS snapshot_local_path = alarm_data.get("snapshot_local_path") object_key = None if snapshot_local_path: # 截图是异步保存的,等待文件写入完成(最多 3 秒) if not os.path.exists(snapshot_local_path): for _ in range(6): time.sleep(0.5) if os.path.exists(snapshot_local_path): break if os.path.exists(snapshot_local_path): object_key = self._upload_snapshot_to_cos( snapshot_local_path, alarm_id, alarm_data.get("device_id", "unknown"), ) if object_key is None: # COS 上传失败,进入重试 self._handle_retry(alarm_json, "COS 上传失败") return elif object_key == "": # COS 未配置,使用本地截图路径作为回退 captures_base = os.path.join("data", "captures") rel_path = os.path.relpath(snapshot_local_path, captures_base) rel_path = rel_path.replace("\\", "/") object_key = f"local:{rel_path}" self._logger.info(f"使用本地截图路径: {object_key}") else: self._logger.warning(f"截图文件不存在: {snapshot_local_path}") # Step 2: HTTP 上报告警元数据 report_data = { "alarm_id": alarm_data.get("alarm_id"), "alarm_type": alarm_data.get("alarm_type"), "device_id": alarm_data.get("device_id"), "scene_id": alarm_data.get("scene_id"), "event_time": alarm_data.get("event_time"), "alarm_level": alarm_data.get("alarm_level"), "snapshot_url": object_key or "", # COS object_key "algorithm_code": alarm_data.get("algorithm_code"), "confidence_score": alarm_data.get("confidence_score"), "ext_data": alarm_data.get("ext_data", {}), } success = self._report_alarm_http(report_data) if success: self._stats["processed"] += 1 self._logger.info(f"告警上报成功: {alarm_id}") # 仅在 COS 上传成功时删除本地截图;本地回退模式(local:)不删除 if snapshot_local_path and os.path.exists(snapshot_local_path) and object_key and not object_key.startswith("local:"): try: os.remove(snapshot_local_path) self._logger.debug(f"已删除本地截图: {snapshot_local_path}") except Exception as e: self._logger.warning(f"删除本地截图失败: {e}") else: # HTTP 上报失败,进入重试 self._handle_retry(alarm_json, "HTTP 上报失败") def _process_resolve(self, resolve_data: dict): """处理告警结束事件 - HTTP POST 到云端""" upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") url = f"{base_url}/admin-api/aiot/alarm/edge/resolve" headers = {"Content-Type": "application/json"} if upload_cfg.edge_token: headers["Authorization"] = f"Bearer {upload_cfg.edge_token}" payload = { "alarm_id": resolve_data.get("alarm_id"), "duration_ms": resolve_data.get("duration_ms"), "last_frame_time": resolve_data.get("last_frame_time"), "resolve_type": resolve_data.get("resolve_type"), } try: response = requests.post(url, json=payload, headers=headers, timeout=10) if response.status_code == 200: body = response.json() if body.get("code") == 0: self._logger.info(f"告警结束上报成功: {resolve_data.get('alarm_id')}") else: self._logger.warning(f"告警结束上报业务错误: {body}") else: self._logger.warning(f"告警结束上报失败: status={response.status_code}") except Exception as e: self._logger.warning(f"告警结束上报异常: {e}") def _upload_snapshot_to_cos( self, local_path: str, alarm_id: str, device_id: str ) -> Optional[str]: """ 上传截图到腾讯云 COS Args: local_path: 本地截图路径 alarm_id: 告警ID device_id: 设备ID Returns: COS object_key,失败返回 None """ cos_cfg = self._settings.cos if not cos_cfg.secret_id or not cos_cfg.bucket: self._logger.warning("COS 未配置,跳过截图上传") return "" # 懒初始化 COS 客户端 if self._cos_client is None: try: from qcloud_cos import CosConfig, CosS3Client config = CosConfig( Region=cos_cfg.region, SecretId=cos_cfg.secret_id, SecretKey=cos_cfg.secret_key, Scheme="https", ) self._cos_client = CosS3Client(config) self._logger.info(f"COS 客户端初始化成功: bucket={cos_cfg.bucket}") except Exception as e: self._logger.error(f"COS 客户端初始化失败: {e}") return None # 生成 Object Key: alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg" try: self._cos_client.put_object_from_local_file( Bucket=cos_cfg.bucket, LocalFilePath=local_path, Key=object_key, ) self._stats["cos_uploaded"] += 1 self._logger.info(f"COS 上传成功: {object_key}") return object_key except Exception as e: self._logger.error(f"COS 上传失败: {e}") return None def _report_alarm_http(self, alarm_data: Dict[str, Any]) -> bool: """ HTTP POST 上报告警元数据到云端 Args: alarm_data: 告警元数据 Returns: 是否上报成功 """ upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") url = f"{base_url}/admin-api/aiot/alarm/edge/report" headers = { "Content-Type": "application/json", } if upload_cfg.edge_token: headers["Authorization"] = f"Bearer {upload_cfg.edge_token}" # 过滤掉内部字段(以 _ 开头的控制字段不发送到云端) report_data = {k: v for k, v in alarm_data.items() if not k.startswith("_")} self._logger.debug(f"HTTP 上报 URL: {url}") self._logger.debug(f"HTTP 上报数据: {report_data}") try: response = requests.post( url, json=report_data, headers=headers, timeout=10, ) if response.status_code == 200: body = response.json() if body.get("code") == 0: self._stats["http_reported"] += 1 return True else: self._logger.warning( f"云端返回业务错误: code={body.get('code')}, msg={body.get('msg')}" ) return False else: # 记录详细的错误信息便于排查 resp_text = "" try: resp_text = response.text[:500] except Exception: pass self._logger.warning( f"HTTP 上报失败: url={url}, status={response.status_code}, " f"body={resp_text}" ) return False except requests.Timeout: self._logger.warning(f"HTTP 上报超时: {url}") return False except requests.ConnectionError as e: self._logger.warning(f"HTTP 上报连接失败: {url}, error={e}") return False except Exception as e: self._logger.error(f"HTTP 上报异常: {url}, error={e}") return False def _handle_retry(self, alarm_json: str, error: str): """处理重试逻辑""" try: alarm_data = json.loads(alarm_json) except json.JSONDecodeError: return retry_count = alarm_data.get("_retry_count", 0) + 1 max_retry = self._settings.alarm_upload.retry_max if retry_count > max_retry: # 超过最大重试次数,写入死信队列 alarm_data["_dead_reason"] = error alarm_data["_dead_at"] = datetime.now(timezone.utc).isoformat() dead_json = json.dumps(alarm_data, ensure_ascii=False) self._redis.lpush(REDIS_KEY_ALARM_DEAD, dead_json) self._stats["dead_lettered"] += 1 self._logger.warning( f"告警进入死信队列: {alarm_data.get('alarm_id')}, " f"reason={error}, retries={retry_count - 1}" ) return # 指数退避:base_interval * 2^(retry_count-1) base_interval = self._settings.alarm_upload.retry_interval delay = base_interval * (2 ** (retry_count - 1)) alarm_data["_retry_count"] = retry_count alarm_data["_retry_at"] = ( datetime.now(timezone.utc).timestamp() + delay ) retry_json = json.dumps(alarm_data, ensure_ascii=False) self._redis.lpush(REDIS_KEY_ALARM_RETRY, retry_json) self._stats["retried"] += 1 self._logger.info( f"告警将重试: {alarm_data.get('alarm_id')}, " f"retry={retry_count}/{max_retry}, delay={delay}s, reason={error}" ) def _process_retry_queue(self): """检查重试队列中到期的告警""" if not self._redis: return try: queue_len = self._redis.llen(REDIS_KEY_ALARM_RETRY) if queue_len == 0: return now = datetime.now(timezone.utc).timestamp() # 逐条检查(FIFO: RPOP 取最早的) checked = 0 while checked < queue_len: item = self._redis.rpop(REDIS_KEY_ALARM_RETRY) if item is None: break checked += 1 try: data = json.loads(item) retry_at = data.get("_retry_at", 0) if now >= retry_at: # 到期,放回 pending 队列重新处理 self._redis.lpush(REDIS_KEY_ALARM_PENDING, item) self._logger.debug( f"重试告警回归 pending: {data.get('alarm_id')}" ) else: # 未到期,放回 retry 队列头部 self._redis.lpush(REDIS_KEY_ALARM_RETRY, item) except json.JSONDecodeError: pass except Exception as e: self._logger.error(f"处理重试队列异常: {e}") def _reconnect_redis(self): """重连 Redis""" redis_cfg = self._settings.local_redis try: self._redis = redis.Redis( host=redis_cfg.host, port=redis_cfg.port, db=redis_cfg.db, password=redis_cfg.password, decode_responses=True, socket_connect_timeout=5, ) self._redis.ping() self._logger.info("Redis 重连成功") except Exception as e: self._logger.error(f"Redis 重连失败: {e}") self._redis = None def get_statistics(self) -> Dict[str, Any]: """获取 Worker 统计""" stats = self._stats.copy() stats["running"] = self._thread is not None and self._thread.is_alive() return stats