feat(edge): 截图响应改为HTTP回调,COS使用预签名URL

- 截图完成后优先通过HTTP回调WVP返回结果,回调失败降级写Redis
- COS上传后生成预签名URL(1小时有效期),不附加额外Params
- 移除Edge端缓存逻辑(缓存由WVP端统一管理)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 15:48:12 +08:00
parent 9ec949ef02
commit 6d1e0e4a5e

View File

@@ -2,7 +2,8 @@
截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS 截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS
数据流: 数据流:
WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → SET snap:result:{id} WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → HTTP 回调 WVP
(回调失败时降级写 Redis snap:result:{id}
""" """
import io import io
@@ -15,6 +16,7 @@ from typing import Optional
import cv2 import cv2
import numpy as np import numpy as np
import requests
from config.settings import get_settings, COSConfig from config.settings import get_settings, COSConfig
@@ -25,16 +27,19 @@ SNAP_REQUEST_STREAM = "edge_snap_request"
SNAP_CONSUMER_GROUP = "edge_snap_group" SNAP_CONSUMER_GROUP = "edge_snap_group"
SNAP_CONSUMER_NAME = "edge_snap_worker" SNAP_CONSUMER_NAME = "edge_snap_worker"
SNAP_RESULT_KEY_PREFIX = "snap:result:" SNAP_RESULT_KEY_PREFIX = "snap:result:"
SNAP_CACHE_KEY_PREFIX = "snap:cache:" SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期
SNAP_RESULT_TTL = 60 # 结果 key 60s 过期
SNAP_CACHE_TTL = 300 # 缓存 key 5min 过期 # HTTP 回调
SNAP_CALLBACK_PATH = "/api/ai/roi/snap/callback"
SNAP_CALLBACK_TIMEOUT = 5 # 回调 HTTP 超时(秒)
class ScreenshotHandler: class ScreenshotHandler:
"""截图处理器 """截图处理器
从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧, 从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧,
JPEG 编码后直传 COS最后将结果 URL 写回 Redis JPEG 编码后直传 COS最后通过 HTTP 回调 WVP 返回结果 URL
回调失败时降级写 Redis。
""" """
def __init__(self, cloud_redis, stream_manager): def __init__(self, cloud_redis, stream_manager):
@@ -166,20 +171,25 @@ class ScreenshotHandler:
request_id = fields.get("request_id", "") request_id = fields.get("request_id", "")
camera_code = fields.get("camera_code", "") camera_code = fields.get("camera_code", "")
cos_path = fields.get("cos_path", "") cos_path = fields.get("cos_path", "")
callback_url = fields.get("callback_url", "")
if not request_id or not camera_code or not cos_path: if not request_id or not camera_code or not cos_path:
logger.warning("[截图] 请求字段不完整: %s", fields) logger.warning("[截图] 请求字段不完整: %s", fields)
self._write_result(request_id, {"status": "error", "message": "请求字段不完整"}) self._send_result(callback_url, request_id, camera_code, {
"status": "error",
"message": "请求字段不完整",
})
return return
logger.info("[截图] 收到截图请求: request_id=%s, camera=%s", request_id, camera_code) logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s",
request_id, camera_code, callback_url or "(无)")
# 1. 抓帧 # 1. 抓帧
jpeg_bytes = self._capture_frame(camera_code) jpeg_bytes = self._capture_frame(camera_code)
if jpeg_bytes is None: if jpeg_bytes is None:
self._write_result(request_id, { self._send_result(callback_url, request_id, camera_code, {
"status": "error", "status": "error",
"message": f"摄像头流未连接: {camera_code}" "message": f"摄像头流未连接: {camera_code}",
}) })
return return
@@ -191,23 +201,17 @@ class ScreenshotHandler:
url = self._upload_to_cos(jpeg_bytes, cos_path) url = self._upload_to_cos(jpeg_bytes, cos_path)
if url is None: if url is None:
self._write_result(request_id, { self._send_result(callback_url, request_id, camera_code, {
"status": "error", "status": "error",
"message": "COS 上传失败" "message": "COS 上传失败",
}) })
return return
# 3. 结果 # 3. 发送结果
result = {"status": "ok", "url": url} self._send_result(callback_url, request_id, camera_code, {
self._write_result(request_id, result) "status": "ok",
"url": url,
# 4. 写缓存 })
cache_key = SNAP_CACHE_KEY_PREFIX + camera_code
cache_data = json.dumps({"url": url, "timestamp": int(time.time())})
try:
self._cloud_redis.set(cache_key, cache_data, ex=SNAP_CACHE_TTL)
except Exception as e:
logger.warning("[截图] 写入缓存失败: %s", e)
logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url) logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url)
@@ -247,8 +251,11 @@ class ScreenshotHandler:
# ==================== COS 上传 ==================== # ==================== COS 上传 ====================
# 预签名 URL 有效期(秒),必须大于 WVP 缓存 TTL300s
PRESIGNED_URL_EXPIRES = 3600
def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]: def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]:
"""上传 JPEG 到 COS返回访问 URL""" """上传 JPEG 到 COS返回预签名访问 URL"""
if not self._cos_client: if not self._cos_client:
logger.error("[截图] COS 客户端未初始化") logger.error("[截图] COS 客户端未初始化")
return None return None
@@ -261,22 +268,53 @@ class ScreenshotHandler:
Key=cos_path, Key=cos_path,
ContentType="image/jpeg", ContentType="image/jpeg",
) )
url = "https://{}.cos.{}.myqcloud.com/{}".format( # 生成预签名 URL不附加额外 Params保持 URL 简洁)
self._cos_config.bucket, self._cos_config.region, cos_path url = self._cos_client.get_presigned_url(
Method="GET",
Bucket=self._cos_config.bucket,
Key=cos_path,
Expired=self.PRESIGNED_URL_EXPIRES,
) )
return url return url
except Exception as e: except Exception as e:
logger.error("[截图] COS 上传失败: %s", e) logger.error("[截图] COS 上传失败: %s", e)
return None return None
# ==================== Redis 结果 ==================== # ==================== 结果发送(回调 + 降级) ====================
def _write_result(self, request_id: str, data: dict): def _send_result(self, callback_url: str, request_id: str, camera_code: str, data: dict):
"""将结果写入 Redis keyTTL 60s""" """发送截图结果:优先 HTTP 回调,失败则降级写 Redis"""
if not request_id: if not request_id:
return return
# 构造回调 body
body = {
"request_id": request_id,
"camera_code": camera_code,
**data,
}
# 优先尝试 HTTP 回调
if callback_url:
try:
url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH
resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT)
if resp.status_code < 300:
logger.info("[截图] HTTP 回调成功: request_id=%s", request_id)
return
else:
logger.warning("[截图] HTTP 回调返回 %d: request_id=%s", resp.status_code, request_id)
except Exception as e:
logger.warning("[截图] HTTP 回调失败: %s, 降级写 Redis", e)
# 降级:写 Redis result key
self._write_result_redis(request_id, body)
def _write_result_redis(self, request_id: str, data: dict):
"""降级:将结果写入 Redis keyTTL 60s"""
key = SNAP_RESULT_KEY_PREFIX + request_id key = SNAP_RESULT_KEY_PREFIX + request_id
try: try:
self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL) self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL)
logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id)
except Exception as e: except Exception as e:
logger.error("[截图] 写入结果失败: %s", e) logger.error("[截图] 降级写 Redis 也失败: %s", e)