""" 告警上报 Worker 独立线程运行,从 Redis 队列消费告警: 1. BRPOP 取待上报告警 2. 上传截图到腾讯云 COS 3. HTTP POST 上报告警元数据到云端 4. 失败重试 / 死信队列 Redis Key 设计: local:alarm:pending - 待上报告警队列 local:alarm:retry - 重试告警队列 local:alarm:dead - 死信告警队列 """ import json import logging import threading import time from datetime import datetime, timezone from typing import Any, Dict, Optional import redis import requests from config.settings import get_settings from core.result_reporter import ( REDIS_KEY_ALARM_PENDING, REDIS_KEY_ALARM_RETRY, REDIS_KEY_ALARM_DEAD, ) logger = logging.getLogger(__name__) class AlarmUploadWorker: """告警上报 Worker 在独立线程中运行,从本地 Redis 消费告警数据, 上传截图到 COS,再 HTTP 上报告警元数据到云端。 """ def __init__(self): self._settings = get_settings() self._logger = logging.getLogger("alarm_upload_worker") self._redis: Optional[redis.Redis] = None self._cos_client = None # 懒初始化 self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._stats = { "processed": 0, "cos_uploaded": 0, "http_reported": 0, "retried": 0, "dead_lettered": 0, "errors": 0, } def start(self): """启动 worker 线程""" if self._thread and self._thread.is_alive(): self._logger.warning("AlarmUploadWorker 已在运行") return # 初始化 Redis 连接 redis_cfg = self._settings.local_redis try: self._redis = redis.Redis( host=redis_cfg.host, port=redis_cfg.port, db=redis_cfg.db, password=redis_cfg.password, decode_responses=True, socket_connect_timeout=5, ) self._redis.ping() self._logger.info(f"Worker Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}") except Exception as e: self._logger.error(f"Worker Redis 连接失败: {e}") return # 启动时验证云端 API 可达性 self._check_cloud_api() self._stop_event.clear() self._thread = threading.Thread( target=self._worker_loop, name="AlarmUploadWorker", daemon=True, ) self._thread.start() self._logger.info("AlarmUploadWorker 已启动") def _check_cloud_api(self): """启动时检查云端 API 是否可达(仅记录日志,不阻断启动)""" upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") health_url = f"{base_url}/health" report_url = f"{base_url}/api/ai/alert/edge/report" self._logger.info(f"云端 API 地址: {base_url}") self._logger.info(f"告警上报端点: {report_url}") try: resp = requests.get(health_url, timeout=5) if resp.status_code == 200: self._logger.info(f"云端健康检查通过: {health_url}") else: self._logger.warning( f"云端健康检查异常: {health_url}, status={resp.status_code}" ) except requests.ConnectionError: self._logger.warning(f"云端不可达: {health_url},请确认服务已启动") except Exception as e: self._logger.warning(f"云端健康检查失败: {e}") def stop(self): """停止 worker""" if not self._thread or not self._thread.is_alive(): return self._logger.info("正在停止 AlarmUploadWorker...") self._stop_event.set() # 等待线程退出(BRPOP 有超时,所以不会永远阻塞) self._thread.join(timeout=10) if self._redis: try: self._redis.close() except Exception: pass self._logger.info("AlarmUploadWorker 已停止") def _worker_loop(self): """主循环:消费待上报告警""" self._logger.info("Worker 主循环开始") while not self._stop_event.is_set(): try: # 先检查重试队列 self._process_retry_queue() # BRPOP 阻塞等待新告警(超时 2 秒,便于检查 stop 信号) result = self._redis.brpop(REDIS_KEY_ALARM_PENDING, timeout=2) if result is None: continue _, alarm_json = result self._process_alarm(alarm_json) except redis.ConnectionError as e: self._logger.error(f"Redis 连接断开: {e}") time.sleep(5) self._reconnect_redis() except Exception as e: self._logger.error(f"Worker 主循环异常: {e}") self._stats["errors"] += 1 time.sleep(1) self._logger.info("Worker 主循环退出") def _process_alarm(self, alarm_json: str): """处理单条告警""" try: alarm_data = json.loads(alarm_json) except json.JSONDecodeError as e: self._logger.error(f"告警 JSON 解析失败: {e}") return # 如果是 resolve 事件,走单独的处理逻辑 if alarm_data.get("_type") == "resolve": self._process_resolve(alarm_data) return alarm_id = alarm_data.get("alarm_id", "unknown") retry_count = alarm_data.get("_retry_count", 0) self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})") # Step 1: 上传截图到 COS(从 base64 解码后直接上传字节流) snapshot_b64 = alarm_data.get("snapshot_b64") object_key = None if snapshot_b64: try: import base64 image_bytes = base64.b64decode(snapshot_b64) object_key = self._upload_snapshot_to_cos( image_bytes, alarm_id, alarm_data.get("device_id", "unknown"), ) if object_key is None: # COS 上传失败,进入重试 self._handle_retry(alarm_json, "COS 上传失败") return except Exception as e: self._logger.error(f"截图解码/上传失败: {e}") self._handle_retry(alarm_json, f"截图处理失败: {e}") return # Step 2: HTTP 上报告警元数据(不含截图二进制数据) report_data = { "alarm_id": alarm_data.get("alarm_id"), "alarm_type": alarm_data.get("alarm_type"), "device_id": alarm_data.get("device_id"), "scene_id": alarm_data.get("scene_id"), "event_time": alarm_data.get("event_time"), "alarm_level": alarm_data.get("alarm_level"), "snapshot_url": object_key or "", # COS object_key "algorithm_code": alarm_data.get("algorithm_code"), "confidence_score": alarm_data.get("confidence_score"), "ext_data": alarm_data.get("ext_data", {}), } success = self._report_alarm_http(report_data) if success: self._stats["processed"] += 1 self._logger.info(f"告警上报成功: {alarm_id}") else: # HTTP 上报失败,进入重试 self._handle_retry(alarm_json, "HTTP 上报失败") def _process_resolve(self, resolve_data: dict): """处理告警结束事件 - HTTP POST 到云端""" upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") url = f"{base_url}/api/ai/alert/edge/resolve" headers = {"Content-Type": "application/json"} if upload_cfg.edge_token: headers["Authorization"] = f"Bearer {upload_cfg.edge_token}" payload = { "alarm_id": resolve_data.get("alarm_id"), "duration_ms": resolve_data.get("duration_ms"), "last_frame_time": resolve_data.get("last_frame_time"), "resolve_type": resolve_data.get("resolve_type"), } try: response = requests.post(url, json=payload, headers=headers, timeout=10) if response.status_code == 200: body = response.json() if body.get("code") == 0: self._logger.info(f"告警结束上报成功: {resolve_data.get('alarm_id')}") else: self._logger.warning(f"告警结束上报业务错误: {body}") else: self._logger.warning(f"告警结束上报失败: status={response.status_code}") except Exception as e: self._logger.warning(f"告警结束上报异常: {e}") def _upload_snapshot_to_cos( self, image_bytes: bytes, alarm_id: str, device_id: str ) -> Optional[str]: """ 上传截图到腾讯云 COS(直接从内存字节流上传) Args: image_bytes: JPEG 图片字节 alarm_id: 告警ID device_id: 设备ID Returns: COS object_key,失败返回 None """ cos_cfg = self._settings.cos if not cos_cfg.secret_id or not cos_cfg.bucket: self._logger.error("COS 未配置(缺少 secret_id 或 bucket),无法上传截图") return None # 懒初始化 COS 客户端 if self._cos_client is None: try: from qcloud_cos import CosConfig, CosS3Client config = CosConfig( Region=cos_cfg.region, SecretId=cos_cfg.secret_id, SecretKey=cos_cfg.secret_key, Scheme="https", ) self._cos_client = CosS3Client(config) self._logger.info(f"COS 客户端初始化成功: bucket={cos_cfg.bucket}") except Exception as e: self._logger.error(f"COS 客户端初始化失败: {e}") return None # 生成 Object Key: alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg" try: self._cos_client.put_object( Bucket=cos_cfg.bucket, Body=image_bytes, Key=object_key, ContentType="image/jpeg", ) self._stats["cos_uploaded"] += 1 self._logger.info(f"COS 上传成功: {object_key}") return object_key except Exception as e: self._logger.error(f"COS 上传失败: {e}") return None def _report_alarm_http(self, alarm_data: Dict[str, Any]) -> bool: """ HTTP POST 上报告警元数据到云端 Args: alarm_data: 告警元数据 Returns: 是否上报成功 """ upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") url = f"{base_url}/api/ai/alert/edge/report" headers = { "Content-Type": "application/json", } if upload_cfg.edge_token: headers["Authorization"] = f"Bearer {upload_cfg.edge_token}" # 过滤掉内部字段(以 _ 开头的控制字段不发送到云端) report_data = {k: v for k, v in alarm_data.items() if not k.startswith("_")} self._logger.debug(f"HTTP 上报 URL: {url}") self._logger.debug(f"HTTP 上报数据: {report_data}") try: response = requests.post( url, json=report_data, headers=headers, timeout=10, ) if response.status_code == 200: body = response.json() if body.get("code") == 0: self._stats["http_reported"] += 1 return True else: self._logger.warning( f"云端返回业务错误: code={body.get('code')}, msg={body.get('msg')}" ) return False else: # 记录详细的错误信息便于排查 resp_text = "" try: resp_text = response.text[:500] except Exception: pass self._logger.warning( f"HTTP 上报失败: url={url}, status={response.status_code}, " f"body={resp_text}" ) return False except requests.Timeout: self._logger.warning(f"HTTP 上报超时: {url}") return False except requests.ConnectionError as e: self._logger.warning(f"HTTP 上报连接失败: {url}, error={e}") return False except Exception as e: self._logger.error(f"HTTP 上报异常: {url}, error={e}") return False def _handle_retry(self, alarm_json: str, error: str): """处理重试逻辑""" try: alarm_data = json.loads(alarm_json) except json.JSONDecodeError: return retry_count = alarm_data.get("_retry_count", 0) + 1 max_retry = self._settings.alarm_upload.retry_max if retry_count > max_retry: # 超过最大重试次数,写入死信队列 alarm_data["_dead_reason"] = error alarm_data["_dead_at"] = datetime.now(timezone.utc).isoformat() dead_json = json.dumps(alarm_data, ensure_ascii=False) self._redis.lpush(REDIS_KEY_ALARM_DEAD, dead_json) self._stats["dead_lettered"] += 1 self._logger.warning( f"告警进入死信队列: {alarm_data.get('alarm_id')}, " f"reason={error}, retries={retry_count - 1}" ) return # 指数退避:base_interval * 2^(retry_count-1) base_interval = self._settings.alarm_upload.retry_interval delay = base_interval * (2 ** (retry_count - 1)) alarm_data["_retry_count"] = retry_count alarm_data["_retry_at"] = ( datetime.now(timezone.utc).timestamp() + delay ) retry_json = json.dumps(alarm_data, ensure_ascii=False) self._redis.lpush(REDIS_KEY_ALARM_RETRY, retry_json) self._stats["retried"] += 1 self._logger.info( f"告警将重试: {alarm_data.get('alarm_id')}, " f"retry={retry_count}/{max_retry}, delay={delay}s, reason={error}" ) def _process_retry_queue(self): """检查重试队列中到期的告警""" if not self._redis: return try: queue_len = self._redis.llen(REDIS_KEY_ALARM_RETRY) if queue_len == 0: return now = datetime.now(timezone.utc).timestamp() # 逐条检查(FIFO: RPOP 取最早的) checked = 0 while checked < queue_len: item = self._redis.rpop(REDIS_KEY_ALARM_RETRY) if item is None: break checked += 1 try: data = json.loads(item) retry_at = data.get("_retry_at", 0) if now >= retry_at: # 到期,放回 pending 队列重新处理 self._redis.lpush(REDIS_KEY_ALARM_PENDING, item) self._logger.debug( f"重试告警回归 pending: {data.get('alarm_id')}" ) else: # 未到期,放回 retry 队列头部 self._redis.lpush(REDIS_KEY_ALARM_RETRY, item) except json.JSONDecodeError: pass except Exception as e: self._logger.error(f"处理重试队列异常: {e}") def _reconnect_redis(self): """重连 Redis""" redis_cfg = self._settings.local_redis try: self._redis = redis.Redis( host=redis_cfg.host, port=redis_cfg.port, db=redis_cfg.db, password=redis_cfg.password, decode_responses=True, socket_connect_timeout=5, ) self._redis.ping() self._logger.info("Redis 重连成功") except Exception as e: self._logger.error(f"Redis 重连失败: {e}") self._redis = None def get_statistics(self) -> Dict[str, Any]: """获取 Worker 统计""" stats = self._stats.copy() stats["running"] = self._thread is not None and self._thread.is_alive() return stats