Files
security-ai-edge/core/screenshot_handler.py
16337 9a1ac16f19 修复:截图回调禁用系统代理,解决502回调失败
requests.post回调WVP时被本地代理(127.0.0.1:7897)拦截导致502,
添加proxies=None绕过系统代理直连WVP。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 09:40:51 +08:00

350 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS
数据流:
WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → HTTP 回调 WVP
(回调失败时降级写 Redis snap:result:{id}
"""
import io
import json
import logging
import threading
import time
import uuid
from typing import Optional
import cv2
import numpy as np
import requests
from config.settings import get_settings, COSConfig
logger = logging.getLogger(__name__)
# Redis 常量
SNAP_REQUEST_STREAM = "edge_snap_request"
SNAP_CONSUMER_GROUP = "edge_snap_group"
SNAP_CONSUMER_NAME = "edge_snap_worker"
SNAP_RESULT_KEY_PREFIX = "snap:result:"
SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期
# HTTP 回调
SNAP_CALLBACK_PATH = "/api/ai/roi/snap/callback"
SNAP_CALLBACK_TIMEOUT = 5 # 回调 HTTP 超时(秒)
class ScreenshotHandler:
"""截图处理器
从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧,
JPEG 编码后直传 COS最后通过 HTTP 回调 WVP 返回结果 URL。
回调失败时降级写 Redis。
"""
def __init__(self, cloud_redis, stream_manager):
"""
Args:
cloud_redis: 云端 Redis 客户端 (redis.Redis)
stream_manager: MultiStreamManager 实例
"""
self._cloud_redis = cloud_redis
self._stream_manager = stream_manager
self._settings = get_settings()
self._cos_client = None
self._cos_config: COSConfig = self._settings.cos
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# ==================== 生命周期 ====================
def start(self):
"""启动截图监听线程"""
if not self._cloud_redis:
logger.warning("[截图] 云端 Redis 不可用,截图处理器未启动")
return
if not self._cos_config.secret_id or not self._cos_config.bucket:
logger.warning("[截图] COS 配置不完整,截图处理器未启动")
return
self._init_cos_client()
self._ensure_consumer_group()
self._stop_event.clear()
self._thread = threading.Thread(
target=self._listen_loop,
name="ScreenshotHandler",
daemon=True,
)
self._thread.start()
logger.info("[截图] 截图处理器已启动")
def stop(self):
"""停止截图监听线程"""
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
logger.info("[截图] 截图处理器已停止")
# ==================== COS 初始化 ====================
def _init_cos_client(self):
"""初始化 COS 客户端"""
try:
from qcloud_cos import CosConfig, CosS3Client
config = CosConfig(
Region=self._cos_config.region,
SecretId=self._cos_config.secret_id,
SecretKey=self._cos_config.secret_key,
)
self._cos_client = CosS3Client(config)
logger.info("[截图] COS 客户端初始化成功: bucket=%s, region=%s",
self._cos_config.bucket, self._cos_config.region)
except Exception as e:
logger.error("[截图] COS 客户端初始化失败: %s", e)
# ==================== Consumer Group ====================
def _ensure_consumer_group(self):
"""确保 Redis Consumer Group 存在"""
try:
self._cloud_redis.xgroup_create(
SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, id="0", mkstream=True
)
logger.info("[截图] 创建 consumer group: %s", SNAP_CONSUMER_GROUP)
except Exception as e:
# BUSYGROUP = group already exists, safe to ignore
if "BUSYGROUP" in str(e):
logger.debug("[截图] consumer group 已存在")
else:
logger.error("[截图] 创建 consumer group 失败: %s", e)
# ==================== 主循环 ====================
def _listen_loop(self):
"""XREADGROUP 循环消费截图请求"""
backoff = 5
max_backoff = 60
while not self._stop_event.is_set():
try:
results = self._cloud_redis.xreadgroup(
SNAP_CONSUMER_GROUP,
SNAP_CONSUMER_NAME,
{SNAP_REQUEST_STREAM: ">"},
count=1,
block=5000,
)
if not results:
continue
backoff = 5 # 重置退避
for stream_name, messages in results:
for msg_id, fields in messages:
try:
self._handle_request(fields)
except Exception as e:
logger.error("[截图] 处理请求失败: %s", e)
finally:
# ACK 消息
try:
self._cloud_redis.xack(
SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id
)
except Exception:
pass
except Exception as e:
if self._stop_event.is_set():
return
logger.warning("[截图] 监听异常: %s, %ds 后重试", e, backoff)
self._stop_event.wait(backoff)
backoff = min(backoff * 2, max_backoff)
# ==================== 请求处理 ====================
def _handle_request(self, fields: dict):
"""处理单个截图请求"""
request_id = fields.get("request_id", "")
camera_code = fields.get("camera_code", "")
cos_path = fields.get("cos_path", "")
callback_url = fields.get("callback_url", "")
if not request_id or not camera_code or not cos_path:
logger.warning("[截图] 请求字段不完整: %s", fields)
self._send_result(callback_url, request_id, camera_code, {
"status": "error",
"message": "请求字段不完整",
})
return
rtsp_url = fields.get("rtsp_url", "")
logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s",
request_id, camera_code, callback_url or "(无)")
# 1. 抓帧
jpeg_bytes = self._capture_frame(camera_code, rtsp_url)
if jpeg_bytes is None:
self._send_result(callback_url, request_id, camera_code, {
"status": "error",
"message": f"摄像头流未连接: {camera_code}",
})
return
# 2. 上传 COS失败重试 1 次)
url = self._upload_to_cos(jpeg_bytes, cos_path)
if url is None:
# 重试一次
logger.info("[截图] COS 上传失败,重试一次...")
url = self._upload_to_cos(jpeg_bytes, cos_path)
if url is None:
self._send_result(callback_url, request_id, camera_code, {
"status": "error",
"message": "COS 上传失败",
})
return
# 3. 发送结果
self._send_result(callback_url, request_id, camera_code, {
"status": "ok",
"url": url,
})
logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url)
# ==================== 抓帧 ====================
def _capture_frame(self, camera_code: str, rtsp_url: str = "") -> Optional[bytes]:
"""从 MultiStreamManager 获取最新帧,无流时降级临时 RTSP 连接"""
# 优先从已有视频流获取(有 ROI 的摄像头,已连接)
stream = self._stream_manager.get_stream(camera_code)
if stream is not None and stream.is_connected:
frame = stream.get_latest_frame(timeout=2.0)
if frame is None:
frame = stream.read(timeout=2.0)
if frame is not None:
return self._encode_jpeg(frame.image)
# 降级:临时 RTSP 连接抓帧(无 ROI 或流未连接的摄像头)
if rtsp_url:
return self._capture_ondemand(camera_code, rtsp_url)
logger.warning("[截图] 未找到摄像头流且无 rtsp_url: %s", camera_code)
return None
def _capture_ondemand(self, camera_code: str, rtsp_url: str) -> Optional[bytes]:
"""临时连接 RTSP 抓取一帧,用完即断"""
cap = None
try:
logger.info("[截图] 临时连接 RTSP: %s%s", camera_code, rtsp_url)
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if not cap.isOpened():
logger.warning("[截图] 临时 RTSP 连接失败: %s", camera_code)
return None
# 读取几帧丢弃(跳过关键帧解码延迟),取最后一帧
frame = None
for _ in range(5):
ret, f = cap.read()
if ret and f is not None:
frame = f
if frame is None:
logger.warning("[截图] 临时连接读帧失败: %s", camera_code)
return None
return self._encode_jpeg(frame)
except Exception as e:
logger.error("[截图] 临时截图异常: %s, %s", camera_code, e)
return None
finally:
if cap is not None:
cap.release()
logger.debug("[截图] 临时 RTSP 连接已释放: %s", camera_code)
def _encode_jpeg(self, image) -> Optional[bytes]:
"""将帧编码为 JPEG 字节"""
try:
success, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, 85])
return buffer.tobytes() if success else None
except Exception as e:
logger.error("[截图] JPEG 编码失败: %s", e)
return None
# ==================== COS 上传 ====================
# 预签名 URL 有效期(秒),必须大于 WVP 缓存 TTL300s
PRESIGNED_URL_EXPIRES = 3600
def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]:
"""上传 JPEG 到 COS返回预签名访问 URL"""
if not self._cos_client:
logger.error("[截图] COS 客户端未初始化")
return None
try:
body = io.BytesIO(jpeg_bytes)
self._cos_client.put_object(
Bucket=self._cos_config.bucket,
Body=body,
Key=cos_path,
ContentType="image/jpeg",
)
# 生成预签名 URL不附加额外 Params保持 URL 简洁)
url = self._cos_client.get_presigned_url(
Method="GET",
Bucket=self._cos_config.bucket,
Key=cos_path,
Expired=self.PRESIGNED_URL_EXPIRES,
)
return url
except Exception as e:
logger.error("[截图] COS 上传失败: %s", e)
return None
# ==================== 结果发送(回调 + 降级) ====================
def _send_result(self, callback_url: str, request_id: str, camera_code: str, data: dict):
"""发送截图结果:优先 HTTP 回调,失败则降级写 Redis"""
if not request_id:
return
# 构造回调 body
body = {
"request_id": request_id,
"camera_code": camera_code,
**data,
}
# 优先尝试 HTTP 回调
if callback_url:
try:
url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH
resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT,
proxies={"http": None, "https": None})
if resp.status_code < 300:
logger.info("[截图] HTTP 回调成功: request_id=%s", request_id)
return
else:
logger.warning("[截图] HTTP 回调返回 %d: request_id=%s", resp.status_code, request_id)
except Exception as e:
logger.warning("[截图] HTTP 回调失败: %s, 降级写 Redis", e)
# 降级:写 Redis result key
self._write_result_redis(request_id, body)
def _write_result_redis(self, request_id: str, data: dict):
"""降级:将结果写入 Redis keyTTL 60s"""
key = SNAP_RESULT_KEY_PREFIX + request_id
try:
self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL)
logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id)
except Exception as e:
logger.error("[截图] 降级写 Redis 也失败: %s", e)