From f70e6b6003b935e6959f3920a583e56360391c88 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 27 Feb 2026 17:22:49 +0800 Subject: [PATCH] =?UTF-8?q?feat(edge):=20=E6=96=B0=E5=A2=9E=E6=88=AA?= =?UTF-8?q?=E5=9B=BE=E5=A4=84=E7=90=86=E6=A8=A1=E5=9D=97=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=BF=9C=E7=A8=8B=E6=88=AA=E5=9B=BE=E8=AF=B7=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 core/screenshot_handler.py:监听云端 Redis Stream 截图请求, 抓帧后直传 COS,将结果 URL 写回 Redis - main.py 集成 ScreenshotHandler 的初始化和停止 - requirements.txt 添加 cos-python-sdk-v5 依赖 Co-Authored-By: Claude Opus 4.6 --- core/screenshot_handler.py | 282 +++++++++++++++++++++++++++++++++++++ main.py | 22 +++ requirements.txt | 3 + 3 files changed, 307 insertions(+) create mode 100644 core/screenshot_handler.py diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py new file mode 100644 index 0000000..8a38ff4 --- /dev/null +++ b/core/screenshot_handler.py @@ -0,0 +1,282 @@ +""" +截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS + +数据流: + WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → SET snap:result:{id} +""" + +import io +import json +import logging +import threading +import time +import uuid +from typing import Optional + +import cv2 +import numpy as np + +from config.settings import get_settings, COSConfig + +logger = logging.getLogger(__name__) + +# Redis 常量 +SNAP_REQUEST_STREAM = "edge_snap_request" +SNAP_CONSUMER_GROUP = "edge_snap_group" +SNAP_CONSUMER_NAME = "edge_snap_worker" +SNAP_RESULT_KEY_PREFIX = "snap:result:" +SNAP_CACHE_KEY_PREFIX = "snap:cache:" +SNAP_RESULT_TTL = 60 # 结果 key 60s 过期 +SNAP_CACHE_TTL = 300 # 缓存 key 5min 过期 + + +class ScreenshotHandler: + """截图处理器 + + 从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧, + JPEG 编码后直传 COS,最后将结果 URL 写回 Redis。 + """ + + def __init__(self, cloud_redis, stream_manager): + """ + Args: + cloud_redis: 云端 Redis 客户端 (redis.Redis) + stream_manager: MultiStreamManager 实例 + """ + self._cloud_redis = cloud_redis + self._stream_manager = stream_manager + self._settings = get_settings() + self._cos_client = None + self._cos_config: COSConfig = self._settings.cos + + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + # ==================== 生命周期 ==================== + + def start(self): + """启动截图监听线程""" + if not self._cloud_redis: + logger.warning("[截图] 云端 Redis 不可用,截图处理器未启动") + return + if not self._cos_config.secret_id or not self._cos_config.bucket: + logger.warning("[截图] COS 配置不完整,截图处理器未启动") + return + + self._init_cos_client() + self._ensure_consumer_group() + + self._stop_event.clear() + self._thread = threading.Thread( + target=self._listen_loop, + name="ScreenshotHandler", + daemon=True, + ) + self._thread.start() + logger.info("[截图] 截图处理器已启动") + + def stop(self): + """停止截图监听线程""" + self._stop_event.set() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + logger.info("[截图] 截图处理器已停止") + + # ==================== COS 初始化 ==================== + + def _init_cos_client(self): + """初始化 COS 客户端""" + try: + from qcloud_cos import CosConfig, CosS3Client + + config = CosConfig( + Region=self._cos_config.region, + SecretId=self._cos_config.secret_id, + SecretKey=self._cos_config.secret_key, + ) + self._cos_client = CosS3Client(config) + logger.info("[截图] COS 客户端初始化成功: bucket=%s, region=%s", + self._cos_config.bucket, self._cos_config.region) + except Exception as e: + logger.error("[截图] COS 客户端初始化失败: %s", e) + + # ==================== Consumer Group ==================== + + def _ensure_consumer_group(self): + """确保 Redis Consumer Group 存在""" + try: + self._cloud_redis.xgroup_create( + SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, id="0", mkstream=True + ) + logger.info("[截图] 创建 consumer group: %s", SNAP_CONSUMER_GROUP) + except Exception as e: + # BUSYGROUP = group already exists, safe to ignore + if "BUSYGROUP" in str(e): + logger.debug("[截图] consumer group 已存在") + else: + logger.error("[截图] 创建 consumer group 失败: %s", e) + + # ==================== 主循环 ==================== + + def _listen_loop(self): + """XREADGROUP 循环消费截图请求""" + backoff = 5 + max_backoff = 60 + + while not self._stop_event.is_set(): + try: + results = self._cloud_redis.xreadgroup( + SNAP_CONSUMER_GROUP, + SNAP_CONSUMER_NAME, + {SNAP_REQUEST_STREAM: ">"}, + count=1, + block=5000, + ) + if not results: + continue + + backoff = 5 # 重置退避 + + for stream_name, messages in results: + for msg_id, fields in messages: + try: + self._handle_request(fields) + except Exception as e: + logger.error("[截图] 处理请求失败: %s", e) + finally: + # ACK 消息 + try: + self._cloud_redis.xack( + SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id + ) + except Exception: + pass + + except Exception as e: + if self._stop_event.is_set(): + return + logger.warning("[截图] 监听异常: %s, %ds 后重试", e, backoff) + self._stop_event.wait(backoff) + backoff = min(backoff * 2, max_backoff) + + # ==================== 请求处理 ==================== + + def _handle_request(self, fields: dict): + """处理单个截图请求""" + request_id = fields.get("request_id", "") + camera_code = fields.get("camera_code", "") + cos_path = fields.get("cos_path", "") + + if not request_id or not camera_code or not cos_path: + logger.warning("[截图] 请求字段不完整: %s", fields) + self._write_result(request_id, {"status": "error", "message": "请求字段不完整"}) + return + + logger.info("[截图] 收到截图请求: request_id=%s, camera=%s", request_id, camera_code) + + # 1. 抓帧 + jpeg_bytes = self._capture_frame(camera_code) + if jpeg_bytes is None: + self._write_result(request_id, { + "status": "error", + "message": f"摄像头流未连接: {camera_code}" + }) + return + + # 2. 上传 COS(失败重试 1 次) + url = self._upload_to_cos(jpeg_bytes, cos_path) + if url is None: + # 重试一次 + logger.info("[截图] COS 上传失败,重试一次...") + url = self._upload_to_cos(jpeg_bytes, cos_path) + + if url is None: + self._write_result(request_id, { + "status": "error", + "message": "COS 上传失败" + }) + return + + # 3. 写结果 + result = {"status": "ok", "url": url} + self._write_result(request_id, result) + + # 4. 写缓存 + cache_key = SNAP_CACHE_KEY_PREFIX + camera_code + cache_data = json.dumps({"url": url, "timestamp": int(time.time())}) + try: + self._cloud_redis.set(cache_key, cache_data, ex=SNAP_CACHE_TTL) + except Exception as e: + logger.warning("[截图] 写入缓存失败: %s", e) + + logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url) + + # ==================== 抓帧 ==================== + + def _capture_frame(self, camera_code: str) -> Optional[bytes]: + """从 MultiStreamManager 获取最新帧并编码为 JPEG""" + stream = self._stream_manager.get_stream(camera_code) + if stream is None: + logger.warning("[截图] 未找到摄像头流: %s", camera_code) + return None + + if not stream.is_connected: + logger.warning("[截图] 摄像头未连接: %s", camera_code) + return None + + frame = stream.get_latest_frame(timeout=2.0) + if frame is None: + # 回退:直接从缓冲区读一帧 + frame = stream.read(timeout=2.0) + + if frame is None: + logger.warning("[截图] 获取帧超时: %s", camera_code) + return None + + # JPEG 编码 + try: + encode_params = [cv2.IMWRITE_JPEG_QUALITY, 85] + success, buffer = cv2.imencode(".jpg", frame.image, encode_params) + if not success: + logger.error("[截图] JPEG 编码失败: %s", camera_code) + return None + return buffer.tobytes() + except Exception as e: + logger.error("[截图] 帧编码异常: %s", e) + return None + + # ==================== COS 上传 ==================== + + def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]: + """上传 JPEG 到 COS,返回访问 URL""" + if not self._cos_client: + logger.error("[截图] COS 客户端未初始化") + return None + + try: + body = io.BytesIO(jpeg_bytes) + self._cos_client.put_object( + Bucket=self._cos_config.bucket, + Body=body, + Key=cos_path, + ContentType="image/jpeg", + ) + url = "https://{}.cos.{}.myqcloud.com/{}".format( + self._cos_config.bucket, self._cos_config.region, cos_path + ) + return url + except Exception as e: + logger.error("[截图] COS 上传失败: %s", e) + return None + + # ==================== Redis 结果 ==================== + + def _write_result(self, request_id: str, data: dict): + """将结果写入 Redis key(TTL 60s)""" + if not request_id: + return + key = SNAP_RESULT_KEY_PREFIX + request_id + try: + self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL) + except Exception as e: + logger.error("[截图] 写入结果失败: %s", e) diff --git a/main.py b/main.py index 5047693..6807511 100644 --- a/main.py +++ b/main.py @@ -21,6 +21,7 @@ from core.tensorrt_engine import TensorRTEngine, EngineManager from core.postprocessor import PostProcessor from core.result_reporter import ResultReporter from core.alarm_upload_worker import AlarmUploadWorker +from core.screenshot_handler import ScreenshotHandler from algorithms import AlgorithmManager from utils.logger import get_logger, StructuredLogger from utils.version_control import get_version_control @@ -48,6 +49,7 @@ class EdgeInferenceService: self._postprocessor: Optional[PostProcessor] = None self._reporter: Optional[ResultReporter] = None self._alarm_worker: Optional[AlarmUploadWorker] = None + self._screenshot_handler: Optional[ScreenshotHandler] = None self._algorithm_manager: Optional[AlgorithmManager] = None self._debug_reload_thread: Optional[threading.Thread] = None self._debug_http_server = None @@ -178,6 +180,22 @@ class EdgeInferenceService: self._logger.info("算法管理器初始化成功") except Exception as e: self._logger.error(f"算法管理器初始化失败: {e}") + + def _init_screenshot_handler(self): + """初始化截图处理器""" + try: + cloud_redis = getattr(self._config_manager, '_cloud_redis', None) + if cloud_redis and self._stream_manager: + self._screenshot_handler = ScreenshotHandler( + cloud_redis=cloud_redis, + stream_manager=self._stream_manager, + ) + self._screenshot_handler.start() + self._logger.info("截图处理器初始化成功") + else: + self._logger.info("截图处理器跳过初始化(云端 Redis 或流管理器不可用)") + except Exception as e: + self._logger.error(f"截图处理器初始化失败: {e}") def _start_debug_reload_watcher(self): """本地调试:监听文件触发同步""" @@ -257,6 +275,7 @@ class EdgeInferenceService: self._init_postprocessor() self._init_reporter() self._init_algorithm_manager() + self._init_screenshot_handler() self._start_debug_reload_watcher() self._start_debug_http_server() @@ -848,6 +867,9 @@ class EdgeInferenceService: if self._alarm_worker: self._alarm_worker.stop() + if self._screenshot_handler: + self._screenshot_handler.stop() + if self._reporter: self._reporter.close() diff --git a/requirements.txt b/requirements.txt index 0efb946..f081142 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,9 @@ paho-mqtt==1.6.1 # Redis客户端 - Redis 4.6.0,4.x最终稳定版 redis==4.6.0 +# 腾讯云COS SDK - 用于截图上传 +cos-python-sdk-v5>=1.9.30 + # ============================================================ # 工具库 # ============================================================