""" 截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS 数据流: WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → HTTP 回调 WVP (回调失败时降级写 Redis snap:result:{id}) """ import io import json import logging import threading import time import uuid from typing import Optional import cv2 import numpy as np import requests from config.settings import get_settings, COSConfig logger = logging.getLogger(__name__) # Redis 常量 SNAP_REQUEST_STREAM = "edge_snap_request" SNAP_CONSUMER_GROUP = "edge_snap_group" SNAP_CONSUMER_NAME = "edge_snap_worker" SNAP_RESULT_KEY_PREFIX = "snap:result:" SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期 # HTTP 回调 SNAP_CALLBACK_PATH = "/api/ai/roi/snap/callback" SNAP_CALLBACK_TIMEOUT = 5 # 回调 HTTP 超时(秒) class ScreenshotHandler: """截图处理器 从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧, JPEG 编码后直传 COS,最后通过 HTTP 回调 WVP 返回结果 URL。 回调失败时降级写 Redis。 """ def __init__(self, cloud_redis, stream_manager): """ Args: cloud_redis: 云端 Redis 客户端 (redis.Redis) stream_manager: MultiStreamManager 实例 """ self._cloud_redis = cloud_redis self._stream_manager = stream_manager self._settings = get_settings() self._cos_client = None self._cos_config: COSConfig = self._settings.cos self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() # ==================== 生命周期 ==================== def start(self): """启动截图监听线程""" if not self._cloud_redis: logger.warning("[截图] 云端 Redis 不可用,截图处理器未启动") return if not self._cos_config.secret_id or not self._cos_config.bucket: logger.warning("[截图] COS 配置不完整,截图处理器未启动") return self._init_cos_client() self._ensure_consumer_group() self._stop_event.clear() self._thread = threading.Thread( target=self._listen_loop, name="ScreenshotHandler", daemon=True, ) self._thread.start() logger.info("[截图] 截图处理器已启动") def stop(self): """停止截图监听线程""" self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=5) logger.info("[截图] 截图处理器已停止") # ==================== COS 初始化 ==================== def _init_cos_client(self): """初始化 COS 客户端""" try: from qcloud_cos import CosConfig, CosS3Client config = CosConfig( Region=self._cos_config.region, SecretId=self._cos_config.secret_id, SecretKey=self._cos_config.secret_key, ) self._cos_client = CosS3Client(config) logger.info("[截图] COS 客户端初始化成功: bucket=%s, region=%s", self._cos_config.bucket, self._cos_config.region) except Exception as e: logger.error("[截图] COS 客户端初始化失败: %s", e) # ==================== Consumer Group ==================== def _ensure_consumer_group(self): """确保 Redis Consumer Group 存在""" try: self._cloud_redis.xgroup_create( SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, id="0", mkstream=True ) logger.info("[截图] 创建 consumer group: %s", SNAP_CONSUMER_GROUP) except Exception as e: # BUSYGROUP = group already exists, safe to ignore if "BUSYGROUP" in str(e): logger.debug("[截图] consumer group 已存在") else: logger.error("[截图] 创建 consumer group 失败: %s", e) # ==================== 主循环 ==================== def _listen_loop(self): """XREADGROUP 循环消费截图请求""" backoff = 5 max_backoff = 60 while not self._stop_event.is_set(): try: results = self._cloud_redis.xreadgroup( SNAP_CONSUMER_GROUP, SNAP_CONSUMER_NAME, {SNAP_REQUEST_STREAM: ">"}, count=1, block=5000, ) if not results: continue backoff = 5 # 重置退避 for stream_name, messages in results: for msg_id, fields in messages: try: self._handle_request(fields) except Exception as e: logger.error("[截图] 处理请求失败: %s", e) finally: # ACK 消息 try: self._cloud_redis.xack( SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id ) except Exception: pass except Exception as e: if self._stop_event.is_set(): return logger.warning("[截图] 监听异常: %s, %ds 后重试", e, backoff) self._stop_event.wait(backoff) backoff = min(backoff * 2, max_backoff) # ==================== 请求处理 ==================== def _handle_request(self, fields: dict): """处理单个截图请求""" request_id = fields.get("request_id", "") camera_code = fields.get("camera_code", "") cos_path = fields.get("cos_path", "") callback_url = fields.get("callback_url", "") if not request_id or not camera_code or not cos_path: logger.warning("[截图] 请求字段不完整: %s", fields) self._send_result(callback_url, request_id, camera_code, { "status": "error", "message": "请求字段不完整", }) return rtsp_url = fields.get("rtsp_url", "") logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s", request_id, camera_code, callback_url or "(无)") # 1. 抓帧 jpeg_bytes = self._capture_frame(camera_code, rtsp_url) if jpeg_bytes is None: self._send_result(callback_url, request_id, camera_code, { "status": "error", "message": f"摄像头流未连接: {camera_code}", }) return # 2. 上传 COS(失败重试 1 次) url = self._upload_to_cos(jpeg_bytes, cos_path) if url is None: # 重试一次 logger.info("[截图] COS 上传失败,重试一次...") url = self._upload_to_cos(jpeg_bytes, cos_path) if url is None: self._send_result(callback_url, request_id, camera_code, { "status": "error", "message": "COS 上传失败", }) return # 3. 发送结果 self._send_result(callback_url, request_id, camera_code, { "status": "ok", "url": url, }) logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url) # ==================== 抓帧 ==================== def _capture_frame(self, camera_code: str, rtsp_url: str = "") -> Optional[bytes]: """从 MultiStreamManager 获取最新帧,无流时降级临时 RTSP 连接""" # 优先从已有视频流获取(有 ROI 的摄像头,已连接) stream = self._stream_manager.get_stream(camera_code) if stream is not None and stream.is_connected: frame = stream.get_latest_frame(timeout=2.0) if frame is None: frame = stream.read(timeout=2.0) if frame is not None: return self._encode_jpeg(frame.image) # 降级:临时 RTSP 连接抓帧(无 ROI 或流未连接的摄像头) if rtsp_url: return self._capture_ondemand(camera_code, rtsp_url) logger.warning("[截图] 未找到摄像头流且无 rtsp_url: %s", camera_code) return None def _capture_ondemand(self, camera_code: str, rtsp_url: str) -> Optional[bytes]: """临时连接 RTSP 抓取一帧,用完即断""" cap = None try: logger.info("[截图] 临时连接 RTSP: %s → %s", camera_code, rtsp_url) cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) if not cap.isOpened(): logger.warning("[截图] 临时 RTSP 连接失败: %s", camera_code) return None # 读取几帧丢弃(跳过关键帧解码延迟),取最后一帧 frame = None for _ in range(5): ret, f = cap.read() if ret and f is not None: frame = f if frame is None: logger.warning("[截图] 临时连接读帧失败: %s", camera_code) return None return self._encode_jpeg(frame) except Exception as e: logger.error("[截图] 临时截图异常: %s, %s", camera_code, e) return None finally: if cap is not None: cap.release() logger.debug("[截图] 临时 RTSP 连接已释放: %s", camera_code) def _encode_jpeg(self, image) -> Optional[bytes]: """将帧编码为 JPEG 字节""" try: success, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, 85]) return buffer.tobytes() if success else None except Exception as e: logger.error("[截图] JPEG 编码失败: %s", e) return None # ==================== COS 上传 ==================== # 预签名 URL 有效期(秒),必须大于 WVP 缓存 TTL(300s) PRESIGNED_URL_EXPIRES = 3600 def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]: """上传 JPEG 到 COS,返回预签名访问 URL""" if not self._cos_client: logger.error("[截图] COS 客户端未初始化") return None try: body = io.BytesIO(jpeg_bytes) self._cos_client.put_object( Bucket=self._cos_config.bucket, Body=body, Key=cos_path, ContentType="image/jpeg", ) # 生成预签名 URL(不附加额外 Params,保持 URL 简洁) url = self._cos_client.get_presigned_url( Method="GET", Bucket=self._cos_config.bucket, Key=cos_path, Expired=self.PRESIGNED_URL_EXPIRES, ) return url except Exception as e: logger.error("[截图] COS 上传失败: %s", e) return None # ==================== 结果发送(回调 + 降级) ==================== def _send_result(self, callback_url: str, request_id: str, camera_code: str, data: dict): """发送截图结果:优先 HTTP 回调,失败则降级写 Redis""" if not request_id: return # 构造回调 body body = { "request_id": request_id, "camera_code": camera_code, **data, } # 优先尝试 HTTP 回调 if callback_url: try: url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT, proxies={"http": None, "https": None}) if resp.status_code < 300: logger.info("[截图] HTTP 回调成功: request_id=%s", request_id) return else: logger.warning("[截图] HTTP 回调返回 %d: request_id=%s", resp.status_code, request_id) except Exception as e: logger.warning("[截图] HTTP 回调失败: %s, 降级写 Redis", e) # 降级:写 Redis result key self._write_result_redis(request_id, body) def _write_result_redis(self, request_id: str, data: dict): """降级:将结果写入 Redis key(TTL 60s)""" key = SNAP_RESULT_KEY_PREFIX + request_id try: self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL) logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id) except Exception as e: logger.error("[截图] 降级写 Redis 也失败: %s", e)