Files
security-ai-edge/core/screenshot_handler.py

349 lines
13 KiB
Python
Raw Normal View History

"""
截图处理模块 - 监听云端 Redis Stream 截图请求截图后上传 COS
数据流
WVP XADD edge_snap_request Edge 消费 capture frame upload COS HTTP 回调 WVP
回调失败时降级写 Redis snap:result:{id}
"""
import io
import json
import logging
import threading
import time
import uuid
from typing import Optional
import cv2
import numpy as np
import requests
from config.settings import get_settings, COSConfig
logger = logging.getLogger(__name__)
# Redis 常量
SNAP_REQUEST_STREAM = "edge_snap_request"
SNAP_CONSUMER_GROUP = "edge_snap_group"
SNAP_CONSUMER_NAME = "edge_snap_worker"
SNAP_RESULT_KEY_PREFIX = "snap:result:"
SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期
# HTTP 回调
SNAP_CALLBACK_PATH = "/api/ai/roi/snap/callback"
SNAP_CALLBACK_TIMEOUT = 5 # 回调 HTTP 超时(秒)
class ScreenshotHandler:
"""截图处理器
从云端 Redis Stream 消费截图请求通过 MultiStreamManager 获取最新帧
JPEG 编码后直传 COS最后通过 HTTP 回调 WVP 返回结果 URL
回调失败时降级写 Redis
"""
def __init__(self, cloud_redis, stream_manager):
"""
Args:
cloud_redis: 云端 Redis 客户端 (redis.Redis)
stream_manager: MultiStreamManager 实例
"""
self._cloud_redis = cloud_redis
self._stream_manager = stream_manager
self._settings = get_settings()
self._cos_client = None
self._cos_config: COSConfig = self._settings.cos
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# ==================== 生命周期 ====================
def start(self):
"""启动截图监听线程"""
if not self._cloud_redis:
logger.warning("[截图] 云端 Redis 不可用,截图处理器未启动")
return
if not self._cos_config.secret_id or not self._cos_config.bucket:
logger.warning("[截图] COS 配置不完整,截图处理器未启动")
return
self._init_cos_client()
self._ensure_consumer_group()
self._stop_event.clear()
self._thread = threading.Thread(
target=self._listen_loop,
name="ScreenshotHandler",
daemon=True,
)
self._thread.start()
logger.info("[截图] 截图处理器已启动")
def stop(self):
"""停止截图监听线程"""
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
logger.info("[截图] 截图处理器已停止")
# ==================== COS 初始化 ====================
def _init_cos_client(self):
"""初始化 COS 客户端"""
try:
from qcloud_cos import CosConfig, CosS3Client
config = CosConfig(
Region=self._cos_config.region,
SecretId=self._cos_config.secret_id,
SecretKey=self._cos_config.secret_key,
)
self._cos_client = CosS3Client(config)
logger.info("[截图] COS 客户端初始化成功: bucket=%s, region=%s",
self._cos_config.bucket, self._cos_config.region)
except Exception as e:
logger.error("[截图] COS 客户端初始化失败: %s", e)
# ==================== Consumer Group ====================
def _ensure_consumer_group(self):
"""确保 Redis Consumer Group 存在"""
try:
self._cloud_redis.xgroup_create(
SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, id="0", mkstream=True
)
logger.info("[截图] 创建 consumer group: %s", SNAP_CONSUMER_GROUP)
except Exception as e:
# BUSYGROUP = group already exists, safe to ignore
if "BUSYGROUP" in str(e):
logger.debug("[截图] consumer group 已存在")
else:
logger.error("[截图] 创建 consumer group 失败: %s", e)
# ==================== 主循环 ====================
def _listen_loop(self):
"""XREADGROUP 循环消费截图请求"""
backoff = 5
max_backoff = 60
while not self._stop_event.is_set():
try:
results = self._cloud_redis.xreadgroup(
SNAP_CONSUMER_GROUP,
SNAP_CONSUMER_NAME,
{SNAP_REQUEST_STREAM: ">"},
count=1,
block=5000,
)
if not results:
continue
backoff = 5 # 重置退避
for stream_name, messages in results:
for msg_id, fields in messages:
try:
self._handle_request(fields)
except Exception as e:
logger.error("[截图] 处理请求失败: %s", e)
finally:
# ACK 消息
try:
self._cloud_redis.xack(
SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id
)
except Exception:
pass
except Exception as e:
if self._stop_event.is_set():
return
logger.warning("[截图] 监听异常: %s, %ds 后重试", e, backoff)
self._stop_event.wait(backoff)
backoff = min(backoff * 2, max_backoff)
# ==================== 请求处理 ====================
def _handle_request(self, fields: dict):
"""处理单个截图请求"""
request_id = fields.get("request_id", "")
camera_code = fields.get("camera_code", "")
cos_path = fields.get("cos_path", "")
callback_url = fields.get("callback_url", "")
if not request_id or not camera_code or not cos_path:
logger.warning("[截图] 请求字段不完整: %s", fields)
self._send_result(callback_url, request_id, camera_code, {
"status": "error",
"message": "请求字段不完整",
})
return
rtsp_url = fields.get("rtsp_url", "")
logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s",
request_id, camera_code, callback_url or "(无)")
# 1. 抓帧
jpeg_bytes = self._capture_frame(camera_code, rtsp_url)
if jpeg_bytes is None:
self._send_result(callback_url, request_id, camera_code, {
"status": "error",
"message": f"摄像头流未连接: {camera_code}",
})
return
# 2. 上传 COS失败重试 1 次)
url = self._upload_to_cos(jpeg_bytes, cos_path)
if url is None:
# 重试一次
logger.info("[截图] COS 上传失败,重试一次...")
url = self._upload_to_cos(jpeg_bytes, cos_path)
if url is None:
self._send_result(callback_url, request_id, camera_code, {
"status": "error",
"message": "COS 上传失败",
})
return
# 3. 发送结果
self._send_result(callback_url, request_id, camera_code, {
"status": "ok",
"url": url,
})
logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url)
# ==================== 抓帧 ====================
def _capture_frame(self, camera_code: str, rtsp_url: str = "") -> Optional[bytes]:
"""从 MultiStreamManager 获取最新帧,无流时降级临时 RTSP 连接"""
# 优先从已有视频流获取(有 ROI 的摄像头,已连接)
stream = self._stream_manager.get_stream(camera_code)
if stream is not None and stream.is_connected:
frame = stream.get_latest_frame(timeout=2.0)
if frame is None:
frame = stream.read(timeout=2.0)
if frame is not None:
return self._encode_jpeg(frame.image)
# 降级:临时 RTSP 连接抓帧(无 ROI 或流未连接的摄像头)
if rtsp_url:
return self._capture_ondemand(camera_code, rtsp_url)
logger.warning("[截图] 未找到摄像头流且无 rtsp_url: %s", camera_code)
return None
def _capture_ondemand(self, camera_code: str, rtsp_url: str) -> Optional[bytes]:
"""临时连接 RTSP 抓取一帧,用完即断"""
cap = None
try:
logger.info("[截图] 临时连接 RTSP: %s%s", camera_code, rtsp_url)
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if not cap.isOpened():
logger.warning("[截图] 临时 RTSP 连接失败: %s", camera_code)
return None
# 读取几帧丢弃(跳过关键帧解码延迟),取最后一帧
frame = None
for _ in range(5):
ret, f = cap.read()
if ret and f is not None:
frame = f
if frame is None:
logger.warning("[截图] 临时连接读帧失败: %s", camera_code)
return None
return self._encode_jpeg(frame)
except Exception as e:
logger.error("[截图] 临时截图异常: %s, %s", camera_code, e)
return None
finally:
if cap is not None:
cap.release()
logger.debug("[截图] 临时 RTSP 连接已释放: %s", camera_code)
def _encode_jpeg(self, image) -> Optional[bytes]:
"""将帧编码为 JPEG 字节"""
try:
success, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, 85])
return buffer.tobytes() if success else None
except Exception as e:
logger.error("[截图] JPEG 编码失败: %s", e)
return None
# ==================== COS 上传 ====================
# 预签名 URL 有效期(秒),必须大于 WVP 缓存 TTL300s
PRESIGNED_URL_EXPIRES = 3600
def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]:
"""上传 JPEG 到 COS返回预签名访问 URL"""
if not self._cos_client:
logger.error("[截图] COS 客户端未初始化")
return None
try:
body = io.BytesIO(jpeg_bytes)
self._cos_client.put_object(
Bucket=self._cos_config.bucket,
Body=body,
Key=cos_path,
ContentType="image/jpeg",
)
# 生成预签名 URL不附加额外 Params保持 URL 简洁)
url = self._cos_client.get_presigned_url(
Method="GET",
Bucket=self._cos_config.bucket,
Key=cos_path,
Expired=self.PRESIGNED_URL_EXPIRES,
)
return url
except Exception as e:
logger.error("[截图] COS 上传失败: %s", e)
return None
# ==================== 结果发送(回调 + 降级) ====================
def _send_result(self, callback_url: str, request_id: str, camera_code: str, data: dict):
"""发送截图结果:优先 HTTP 回调,失败则降级写 Redis"""
if not request_id:
return
# 构造回调 body
body = {
"request_id": request_id,
"camera_code": camera_code,
**data,
}
# 优先尝试 HTTP 回调
if callback_url:
try:
url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH
resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT)
if resp.status_code < 300:
logger.info("[截图] HTTP 回调成功: request_id=%s", request_id)
return
else:
logger.warning("[截图] HTTP 回调返回 %d: request_id=%s", resp.status_code, request_id)
except Exception as e:
logger.warning("[截图] HTTP 回调失败: %s, 降级写 Redis", e)
# 降级:写 Redis result key
self._write_result_redis(request_id, body)
def _write_result_redis(self, request_id: str, data: dict):
"""降级:将结果写入 Redis keyTTL 60s"""
key = SNAP_RESULT_KEY_PREFIX + request_id
try:
self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL)
logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id)
except Exception as e:
logger.error("[截图] 降级写 Redis 也失败: %s", e)