From 6d1e0e4a5e99cfc1feefa2fde1e330f8ab2f7425 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sat, 28 Feb 2026 15:48:12 +0800 Subject: [PATCH] =?UTF-8?q?feat(edge):=20=E6=88=AA=E5=9B=BE=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E6=94=B9=E4=B8=BAHTTP=E5=9B=9E=E8=B0=83=EF=BC=8CCOS?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E9=A2=84=E7=AD=BE=E5=90=8DURL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 截图完成后优先通过HTTP回调WVP返回结果,回调失败降级写Redis - COS上传后生成预签名URL(1小时有效期),不附加额外Params - 移除Edge端缓存逻辑(缓存由WVP端统一管理) Co-Authored-By: Claude Opus 4.6 --- core/screenshot_handler.py | 96 ++++++++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 29 deletions(-) diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index 8a38ff4..238a435 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -2,7 +2,8 @@ 截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS 数据流: - WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → SET snap:result:{id} + WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → HTTP 回调 WVP + (回调失败时降级写 Redis snap:result:{id}) """ import io @@ -15,6 +16,7 @@ from typing import Optional import cv2 import numpy as np +import requests from config.settings import get_settings, COSConfig @@ -25,16 +27,19 @@ SNAP_REQUEST_STREAM = "edge_snap_request" SNAP_CONSUMER_GROUP = "edge_snap_group" SNAP_CONSUMER_NAME = "edge_snap_worker" SNAP_RESULT_KEY_PREFIX = "snap:result:" -SNAP_CACHE_KEY_PREFIX = "snap:cache:" -SNAP_RESULT_TTL = 60 # 结果 key 60s 过期 -SNAP_CACHE_TTL = 300 # 缓存 key 5min 过期 +SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期 + +# HTTP 回调 +SNAP_CALLBACK_PATH = "/api/ai/roi/snap/callback" +SNAP_CALLBACK_TIMEOUT = 5 # 回调 HTTP 超时(秒) class ScreenshotHandler: """截图处理器 从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧, - JPEG 编码后直传 COS,最后将结果 URL 写回 Redis。 + JPEG 编码后直传 COS,最后通过 HTTP 回调 WVP 返回结果 URL。 + 回调失败时降级写 Redis。 """ def __init__(self, cloud_redis, stream_manager): @@ -166,20 +171,25 @@ class ScreenshotHandler: request_id = fields.get("request_id", "") camera_code = fields.get("camera_code", "") cos_path = fields.get("cos_path", "") + callback_url = fields.get("callback_url", "") if not request_id or not camera_code or not cos_path: logger.warning("[截图] 请求字段不完整: %s", fields) - self._write_result(request_id, {"status": "error", "message": "请求字段不完整"}) + self._send_result(callback_url, request_id, camera_code, { + "status": "error", + "message": "请求字段不完整", + }) return - logger.info("[截图] 收到截图请求: request_id=%s, camera=%s", request_id, camera_code) + logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s", + request_id, camera_code, callback_url or "(无)") # 1. 抓帧 jpeg_bytes = self._capture_frame(camera_code) if jpeg_bytes is None: - self._write_result(request_id, { + self._send_result(callback_url, request_id, camera_code, { "status": "error", - "message": f"摄像头流未连接: {camera_code}" + "message": f"摄像头流未连接: {camera_code}", }) return @@ -191,23 +201,17 @@ class ScreenshotHandler: url = self._upload_to_cos(jpeg_bytes, cos_path) if url is None: - self._write_result(request_id, { + self._send_result(callback_url, request_id, camera_code, { "status": "error", - "message": "COS 上传失败" + "message": "COS 上传失败", }) return - # 3. 写结果 - result = {"status": "ok", "url": url} - self._write_result(request_id, result) - - # 4. 写缓存 - cache_key = SNAP_CACHE_KEY_PREFIX + camera_code - cache_data = json.dumps({"url": url, "timestamp": int(time.time())}) - try: - self._cloud_redis.set(cache_key, cache_data, ex=SNAP_CACHE_TTL) - except Exception as e: - logger.warning("[截图] 写入缓存失败: %s", e) + # 3. 发送结果 + self._send_result(callback_url, request_id, camera_code, { + "status": "ok", + "url": url, + }) logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url) @@ -247,8 +251,11 @@ class ScreenshotHandler: # ==================== COS 上传 ==================== + # 预签名 URL 有效期(秒),必须大于 WVP 缓存 TTL(300s) + PRESIGNED_URL_EXPIRES = 3600 + def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]: - """上传 JPEG 到 COS,返回访问 URL""" + """上传 JPEG 到 COS,返回预签名访问 URL""" if not self._cos_client: logger.error("[截图] COS 客户端未初始化") return None @@ -261,22 +268,53 @@ class ScreenshotHandler: Key=cos_path, ContentType="image/jpeg", ) - url = "https://{}.cos.{}.myqcloud.com/{}".format( - self._cos_config.bucket, self._cos_config.region, cos_path + # 生成预签名 URL(不附加额外 Params,保持 URL 简洁) + url = self._cos_client.get_presigned_url( + Method="GET", + Bucket=self._cos_config.bucket, + Key=cos_path, + Expired=self.PRESIGNED_URL_EXPIRES, ) return url except Exception as e: logger.error("[截图] COS 上传失败: %s", e) return None - # ==================== Redis 结果 ==================== + # ==================== 结果发送(回调 + 降级) ==================== - def _write_result(self, request_id: str, data: dict): - """将结果写入 Redis key(TTL 60s)""" + def _send_result(self, callback_url: str, request_id: str, camera_code: str, data: dict): + """发送截图结果:优先 HTTP 回调,失败则降级写 Redis""" if not request_id: return + + # 构造回调 body + body = { + "request_id": request_id, + "camera_code": camera_code, + **data, + } + + # 优先尝试 HTTP 回调 + if callback_url: + try: + url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH + resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT) + if resp.status_code < 300: + logger.info("[截图] HTTP 回调成功: request_id=%s", request_id) + return + else: + logger.warning("[截图] HTTP 回调返回 %d: request_id=%s", resp.status_code, request_id) + except Exception as e: + logger.warning("[截图] HTTP 回调失败: %s, 降级写 Redis", e) + + # 降级:写 Redis result key + self._write_result_redis(request_id, body) + + def _write_result_redis(self, request_id: str, data: dict): + """降级:将结果写入 Redis key(TTL 60s)""" key = SNAP_RESULT_KEY_PREFIX + request_id try: self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL) + logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id) except Exception as e: - logger.error("[截图] 写入结果失败: %s", e) + logger.error("[截图] 降级写 Redis 也失败: %s", e)