From f70e6b6003b935e6959f3920a583e56360391c88 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 27 Feb 2026 17:22:49 +0800 Subject: [PATCH 01/11] =?UTF-8?q?feat(edge):=20=E6=96=B0=E5=A2=9E=E6=88=AA?= =?UTF-8?q?=E5=9B=BE=E5=A4=84=E7=90=86=E6=A8=A1=E5=9D=97=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=BF=9C=E7=A8=8B=E6=88=AA=E5=9B=BE=E8=AF=B7=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 core/screenshot_handler.py:监听云端 Redis Stream 截图请求, 抓帧后直传 COS,将结果 URL 写回 Redis - main.py 集成 ScreenshotHandler 的初始化和停止 - requirements.txt 添加 cos-python-sdk-v5 依赖 Co-Authored-By: Claude Opus 4.6 --- core/screenshot_handler.py | 282 +++++++++++++++++++++++++++++++++++++ main.py | 22 +++ requirements.txt | 3 + 3 files changed, 307 insertions(+) create mode 100644 core/screenshot_handler.py diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py new file mode 100644 index 0000000..8a38ff4 --- /dev/null +++ b/core/screenshot_handler.py @@ -0,0 +1,282 @@ +""" +截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS + +数据流: + WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → SET snap:result:{id} +""" + +import io +import json +import logging +import threading +import time +import uuid +from typing import Optional + +import cv2 +import numpy as np + +from config.settings import get_settings, COSConfig + +logger = logging.getLogger(__name__) + +# Redis 常量 +SNAP_REQUEST_STREAM = "edge_snap_request" +SNAP_CONSUMER_GROUP = "edge_snap_group" +SNAP_CONSUMER_NAME = "edge_snap_worker" +SNAP_RESULT_KEY_PREFIX = "snap:result:" +SNAP_CACHE_KEY_PREFIX = "snap:cache:" +SNAP_RESULT_TTL = 60 # 结果 key 60s 过期 +SNAP_CACHE_TTL = 300 # 缓存 key 5min 过期 + + +class ScreenshotHandler: + """截图处理器 + + 从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧, + JPEG 编码后直传 COS,最后将结果 URL 写回 Redis。 + """ + + def __init__(self, cloud_redis, stream_manager): + """ + Args: + cloud_redis: 云端 Redis 客户端 (redis.Redis) + stream_manager: MultiStreamManager 实例 + """ + self._cloud_redis = cloud_redis + self._stream_manager = stream_manager + self._settings = get_settings() + self._cos_client = None + self._cos_config: COSConfig = self._settings.cos + + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + # ==================== 生命周期 ==================== + + def start(self): + """启动截图监听线程""" + if not self._cloud_redis: + logger.warning("[截图] 云端 Redis 不可用,截图处理器未启动") + return + if not self._cos_config.secret_id or not self._cos_config.bucket: + logger.warning("[截图] COS 配置不完整,截图处理器未启动") + return + + self._init_cos_client() + self._ensure_consumer_group() + + self._stop_event.clear() + self._thread = threading.Thread( + target=self._listen_loop, + name="ScreenshotHandler", + daemon=True, + ) + self._thread.start() + logger.info("[截图] 截图处理器已启动") + + def stop(self): + """停止截图监听线程""" + self._stop_event.set() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + logger.info("[截图] 截图处理器已停止") + + # ==================== COS 初始化 ==================== + + def _init_cos_client(self): + """初始化 COS 客户端""" + try: + from qcloud_cos import CosConfig, CosS3Client + + config = CosConfig( + Region=self._cos_config.region, + SecretId=self._cos_config.secret_id, + SecretKey=self._cos_config.secret_key, + ) + self._cos_client = CosS3Client(config) + logger.info("[截图] COS 客户端初始化成功: bucket=%s, region=%s", + self._cos_config.bucket, self._cos_config.region) + except Exception as e: + logger.error("[截图] COS 客户端初始化失败: %s", e) + + # ==================== Consumer Group ==================== + + def _ensure_consumer_group(self): + """确保 Redis Consumer Group 存在""" + try: + self._cloud_redis.xgroup_create( + SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, id="0", mkstream=True + ) + logger.info("[截图] 创建 consumer group: %s", SNAP_CONSUMER_GROUP) + except Exception as e: + # BUSYGROUP = group already exists, safe to ignore + if "BUSYGROUP" in str(e): + logger.debug("[截图] consumer group 已存在") + else: + logger.error("[截图] 创建 consumer group 失败: %s", e) + + # ==================== 主循环 ==================== + + def _listen_loop(self): + """XREADGROUP 循环消费截图请求""" + backoff = 5 + max_backoff = 60 + + while not self._stop_event.is_set(): + try: + results = self._cloud_redis.xreadgroup( + SNAP_CONSUMER_GROUP, + SNAP_CONSUMER_NAME, + {SNAP_REQUEST_STREAM: ">"}, + count=1, + block=5000, + ) + if not results: + continue + + backoff = 5 # 重置退避 + + for stream_name, messages in results: + for msg_id, fields in messages: + try: + self._handle_request(fields) + except Exception as e: + logger.error("[截图] 处理请求失败: %s", e) + finally: + # ACK 消息 + try: + self._cloud_redis.xack( + SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id + ) + except Exception: + pass + + except Exception as e: + if self._stop_event.is_set(): + return + logger.warning("[截图] 监听异常: %s, %ds 后重试", e, backoff) + self._stop_event.wait(backoff) + backoff = min(backoff * 2, max_backoff) + + # ==================== 请求处理 ==================== + + def _handle_request(self, fields: dict): + """处理单个截图请求""" + request_id = fields.get("request_id", "") + camera_code = fields.get("camera_code", "") + cos_path = fields.get("cos_path", "") + + if not request_id or not camera_code or not cos_path: + logger.warning("[截图] 请求字段不完整: %s", fields) + self._write_result(request_id, {"status": "error", "message": "请求字段不完整"}) + return + + logger.info("[截图] 收到截图请求: request_id=%s, camera=%s", request_id, camera_code) + + # 1. 抓帧 + jpeg_bytes = self._capture_frame(camera_code) + if jpeg_bytes is None: + self._write_result(request_id, { + "status": "error", + "message": f"摄像头流未连接: {camera_code}" + }) + return + + # 2. 上传 COS(失败重试 1 次) + url = self._upload_to_cos(jpeg_bytes, cos_path) + if url is None: + # 重试一次 + logger.info("[截图] COS 上传失败,重试一次...") + url = self._upload_to_cos(jpeg_bytes, cos_path) + + if url is None: + self._write_result(request_id, { + "status": "error", + "message": "COS 上传失败" + }) + return + + # 3. 写结果 + result = {"status": "ok", "url": url} + self._write_result(request_id, result) + + # 4. 写缓存 + cache_key = SNAP_CACHE_KEY_PREFIX + camera_code + cache_data = json.dumps({"url": url, "timestamp": int(time.time())}) + try: + self._cloud_redis.set(cache_key, cache_data, ex=SNAP_CACHE_TTL) + except Exception as e: + logger.warning("[截图] 写入缓存失败: %s", e) + + logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url) + + # ==================== 抓帧 ==================== + + def _capture_frame(self, camera_code: str) -> Optional[bytes]: + """从 MultiStreamManager 获取最新帧并编码为 JPEG""" + stream = self._stream_manager.get_stream(camera_code) + if stream is None: + logger.warning("[截图] 未找到摄像头流: %s", camera_code) + return None + + if not stream.is_connected: + logger.warning("[截图] 摄像头未连接: %s", camera_code) + return None + + frame = stream.get_latest_frame(timeout=2.0) + if frame is None: + # 回退:直接从缓冲区读一帧 + frame = stream.read(timeout=2.0) + + if frame is None: + logger.warning("[截图] 获取帧超时: %s", camera_code) + return None + + # JPEG 编码 + try: + encode_params = [cv2.IMWRITE_JPEG_QUALITY, 85] + success, buffer = cv2.imencode(".jpg", frame.image, encode_params) + if not success: + logger.error("[截图] JPEG 编码失败: %s", camera_code) + return None + return buffer.tobytes() + except Exception as e: + logger.error("[截图] 帧编码异常: %s", e) + return None + + # ==================== COS 上传 ==================== + + def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]: + """上传 JPEG 到 COS,返回访问 URL""" + if not self._cos_client: + logger.error("[截图] COS 客户端未初始化") + return None + + try: + body = io.BytesIO(jpeg_bytes) + self._cos_client.put_object( + Bucket=self._cos_config.bucket, + Body=body, + Key=cos_path, + ContentType="image/jpeg", + ) + url = "https://{}.cos.{}.myqcloud.com/{}".format( + self._cos_config.bucket, self._cos_config.region, cos_path + ) + return url + except Exception as e: + logger.error("[截图] COS 上传失败: %s", e) + return None + + # ==================== Redis 结果 ==================== + + def _write_result(self, request_id: str, data: dict): + """将结果写入 Redis key(TTL 60s)""" + if not request_id: + return + key = SNAP_RESULT_KEY_PREFIX + request_id + try: + self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL) + except Exception as e: + logger.error("[截图] 写入结果失败: %s", e) diff --git a/main.py b/main.py index 5047693..6807511 100644 --- a/main.py +++ b/main.py @@ -21,6 +21,7 @@ from core.tensorrt_engine import TensorRTEngine, EngineManager from core.postprocessor import PostProcessor from core.result_reporter import ResultReporter from core.alarm_upload_worker import AlarmUploadWorker +from core.screenshot_handler import ScreenshotHandler from algorithms import AlgorithmManager from utils.logger import get_logger, StructuredLogger from utils.version_control import get_version_control @@ -48,6 +49,7 @@ class EdgeInferenceService: self._postprocessor: Optional[PostProcessor] = None self._reporter: Optional[ResultReporter] = None self._alarm_worker: Optional[AlarmUploadWorker] = None + self._screenshot_handler: Optional[ScreenshotHandler] = None self._algorithm_manager: Optional[AlgorithmManager] = None self._debug_reload_thread: Optional[threading.Thread] = None self._debug_http_server = None @@ -178,6 +180,22 @@ class EdgeInferenceService: self._logger.info("算法管理器初始化成功") except Exception as e: self._logger.error(f"算法管理器初始化失败: {e}") + + def _init_screenshot_handler(self): + """初始化截图处理器""" + try: + cloud_redis = getattr(self._config_manager, '_cloud_redis', None) + if cloud_redis and self._stream_manager: + self._screenshot_handler = ScreenshotHandler( + cloud_redis=cloud_redis, + stream_manager=self._stream_manager, + ) + self._screenshot_handler.start() + self._logger.info("截图处理器初始化成功") + else: + self._logger.info("截图处理器跳过初始化(云端 Redis 或流管理器不可用)") + except Exception as e: + self._logger.error(f"截图处理器初始化失败: {e}") def _start_debug_reload_watcher(self): """本地调试:监听文件触发同步""" @@ -257,6 +275,7 @@ class EdgeInferenceService: self._init_postprocessor() self._init_reporter() self._init_algorithm_manager() + self._init_screenshot_handler() self._start_debug_reload_watcher() self._start_debug_http_server() @@ -848,6 +867,9 @@ class EdgeInferenceService: if self._alarm_worker: self._alarm_worker.stop() + if self._screenshot_handler: + self._screenshot_handler.stop() + if self._reporter: self._reporter.close() diff --git a/requirements.txt b/requirements.txt index 0efb946..f081142 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,9 @@ paho-mqtt==1.6.1 # Redis客户端 - Redis 4.6.0,4.x最终稳定版 redis==4.6.0 +# 腾讯云COS SDK - 用于截图上传 +cos-python-sdk-v5>=1.9.30 + # ============================================================ # 工具库 # ============================================================ From 9ec949ef02ba4f920bc30e63c0da52f1d4dc033d Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sat, 28 Feb 2026 09:04:26 +0800 Subject: [PATCH 02/11] =?UTF-8?q?fix(edge):=20LOCAL=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E4=B8=8B=E7=8B=AC=E7=AB=8B=E8=BF=9E=E6=8E=A5=E4=BA=91=E7=AB=AF?= =?UTF-8?q?Redis=EF=BC=8C=E7=A1=AE=E4=BF=9D=E6=88=AA=E5=9B=BE=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8=E5=8F=AF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CONFIG_SYNC_MODE=LOCAL 时 config_sync 不初始化云端 Redis, 截图处理器改为独立创建 Redis 连接。 Co-Authored-By: Claude Opus 4.6 --- main.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/main.py b/main.py index 6807511..fea1fd7 100644 --- a/main.py +++ b/main.py @@ -184,7 +184,30 @@ class EdgeInferenceService: def _init_screenshot_handler(self): """初始化截图处理器""" try: + # 优先从 config_manager 获取已有的云端 Redis 连接 cloud_redis = getattr(self._config_manager, '_cloud_redis', None) + + # LOCAL 模式下 config_manager 不初始化云端 Redis,需要独立创建 + if cloud_redis is None: + try: + import redis + cfg = self._settings.cloud_redis + cloud_redis = redis.Redis( + host=cfg.host, + port=cfg.port, + db=cfg.db, + password=cfg.password, + decode_responses=cfg.decode_responses, + socket_connect_timeout=5, + socket_timeout=5, + retry_on_timeout=True, + ) + cloud_redis.ping() + self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}") + except Exception as e: + self._logger.warning(f"截图处理器无法连接云端 Redis: {e}") + cloud_redis = None + if cloud_redis and self._stream_manager: self._screenshot_handler = ScreenshotHandler( cloud_redis=cloud_redis, @@ -193,7 +216,7 @@ class EdgeInferenceService: self._screenshot_handler.start() self._logger.info("截图处理器初始化成功") else: - self._logger.info("截图处理器跳过初始化(云端 Redis 或流管理器不可用)") + self._logger.warning("截图处理器跳过初始化(云端 Redis 不可达或流管理器不可用)") except Exception as e: self._logger.error(f"截图处理器初始化失败: {e}") From 6d1e0e4a5e99cfc1feefa2fde1e330f8ab2f7425 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sat, 28 Feb 2026 15:48:12 +0800 Subject: [PATCH 03/11] =?UTF-8?q?feat(edge):=20=E6=88=AA=E5=9B=BE=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E6=94=B9=E4=B8=BAHTTP=E5=9B=9E=E8=B0=83=EF=BC=8CCOS?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E9=A2=84=E7=AD=BE=E5=90=8DURL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 截图完成后优先通过HTTP回调WVP返回结果,回调失败降级写Redis - COS上传后生成预签名URL(1小时有效期),不附加额外Params - 移除Edge端缓存逻辑(缓存由WVP端统一管理) Co-Authored-By: Claude Opus 4.6 --- core/screenshot_handler.py | 96 ++++++++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 29 deletions(-) diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index 8a38ff4..238a435 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -2,7 +2,8 @@ 截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS 数据流: - WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → SET snap:result:{id} + WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → HTTP 回调 WVP + (回调失败时降级写 Redis snap:result:{id}) """ import io @@ -15,6 +16,7 @@ from typing import Optional import cv2 import numpy as np +import requests from config.settings import get_settings, COSConfig @@ -25,16 +27,19 @@ SNAP_REQUEST_STREAM = "edge_snap_request" SNAP_CONSUMER_GROUP = "edge_snap_group" SNAP_CONSUMER_NAME = "edge_snap_worker" SNAP_RESULT_KEY_PREFIX = "snap:result:" -SNAP_CACHE_KEY_PREFIX = "snap:cache:" -SNAP_RESULT_TTL = 60 # 结果 key 60s 过期 -SNAP_CACHE_TTL = 300 # 缓存 key 5min 过期 +SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期 + +# HTTP 回调 +SNAP_CALLBACK_PATH = "/api/ai/roi/snap/callback" +SNAP_CALLBACK_TIMEOUT = 5 # 回调 HTTP 超时(秒) class ScreenshotHandler: """截图处理器 从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧, - JPEG 编码后直传 COS,最后将结果 URL 写回 Redis。 + JPEG 编码后直传 COS,最后通过 HTTP 回调 WVP 返回结果 URL。 + 回调失败时降级写 Redis。 """ def __init__(self, cloud_redis, stream_manager): @@ -166,20 +171,25 @@ class ScreenshotHandler: request_id = fields.get("request_id", "") camera_code = fields.get("camera_code", "") cos_path = fields.get("cos_path", "") + callback_url = fields.get("callback_url", "") if not request_id or not camera_code or not cos_path: logger.warning("[截图] 请求字段不完整: %s", fields) - self._write_result(request_id, {"status": "error", "message": "请求字段不完整"}) + self._send_result(callback_url, request_id, camera_code, { + "status": "error", + "message": "请求字段不完整", + }) return - logger.info("[截图] 收到截图请求: request_id=%s, camera=%s", request_id, camera_code) + logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s", + request_id, camera_code, callback_url or "(无)") # 1. 抓帧 jpeg_bytes = self._capture_frame(camera_code) if jpeg_bytes is None: - self._write_result(request_id, { + self._send_result(callback_url, request_id, camera_code, { "status": "error", - "message": f"摄像头流未连接: {camera_code}" + "message": f"摄像头流未连接: {camera_code}", }) return @@ -191,23 +201,17 @@ class ScreenshotHandler: url = self._upload_to_cos(jpeg_bytes, cos_path) if url is None: - self._write_result(request_id, { + self._send_result(callback_url, request_id, camera_code, { "status": "error", - "message": "COS 上传失败" + "message": "COS 上传失败", }) return - # 3. 写结果 - result = {"status": "ok", "url": url} - self._write_result(request_id, result) - - # 4. 写缓存 - cache_key = SNAP_CACHE_KEY_PREFIX + camera_code - cache_data = json.dumps({"url": url, "timestamp": int(time.time())}) - try: - self._cloud_redis.set(cache_key, cache_data, ex=SNAP_CACHE_TTL) - except Exception as e: - logger.warning("[截图] 写入缓存失败: %s", e) + # 3. 发送结果 + self._send_result(callback_url, request_id, camera_code, { + "status": "ok", + "url": url, + }) logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url) @@ -247,8 +251,11 @@ class ScreenshotHandler: # ==================== COS 上传 ==================== + # 预签名 URL 有效期(秒),必须大于 WVP 缓存 TTL(300s) + PRESIGNED_URL_EXPIRES = 3600 + def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]: - """上传 JPEG 到 COS,返回访问 URL""" + """上传 JPEG 到 COS,返回预签名访问 URL""" if not self._cos_client: logger.error("[截图] COS 客户端未初始化") return None @@ -261,22 +268,53 @@ class ScreenshotHandler: Key=cos_path, ContentType="image/jpeg", ) - url = "https://{}.cos.{}.myqcloud.com/{}".format( - self._cos_config.bucket, self._cos_config.region, cos_path + # 生成预签名 URL(不附加额外 Params,保持 URL 简洁) + url = self._cos_client.get_presigned_url( + Method="GET", + Bucket=self._cos_config.bucket, + Key=cos_path, + Expired=self.PRESIGNED_URL_EXPIRES, ) return url except Exception as e: logger.error("[截图] COS 上传失败: %s", e) return None - # ==================== Redis 结果 ==================== + # ==================== 结果发送(回调 + 降级) ==================== - def _write_result(self, request_id: str, data: dict): - """将结果写入 Redis key(TTL 60s)""" + def _send_result(self, callback_url: str, request_id: str, camera_code: str, data: dict): + """发送截图结果:优先 HTTP 回调,失败则降级写 Redis""" if not request_id: return + + # 构造回调 body + body = { + "request_id": request_id, + "camera_code": camera_code, + **data, + } + + # 优先尝试 HTTP 回调 + if callback_url: + try: + url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH + resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT) + if resp.status_code < 300: + logger.info("[截图] HTTP 回调成功: request_id=%s", request_id) + return + else: + logger.warning("[截图] HTTP 回调返回 %d: request_id=%s", resp.status_code, request_id) + except Exception as e: + logger.warning("[截图] HTTP 回调失败: %s, 降级写 Redis", e) + + # 降级:写 Redis result key + self._write_result_redis(request_id, body) + + def _write_result_redis(self, request_id: str, data: dict): + """降级:将结果写入 Redis key(TTL 60s)""" key = SNAP_RESULT_KEY_PREFIX + request_id try: self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL) + logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id) except Exception as e: - logger.error("[截图] 写入结果失败: %s", e) + logger.error("[截图] 降级写 Redis 也失败: %s", e) From a49a1be0eb7b3529a9bd976a47dc6068b416f9d7 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sat, 28 Feb 2026 15:48:18 +0800 Subject: [PATCH 04/11] =?UTF-8?q?fix(edge):=20=E4=BA=91=E7=AB=AFRedis?= =?UTF-8?q?=E9=BB=98=E8=AE=A4db=E6=94=B9=E4=B8=BA1=EF=BC=8C=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=9B=B4=E6=96=B0=E5=9B=9E=E8=B0=83=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=89=80=E6=9C=89=E5=90=8C=E6=AD=A5=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CloudRedisConfig.db 默认值从0改为1,与部署环境一致 - 配置更新回调不再限制为LOCAL模式,REDIS模式也支持动态热更新 Co-Authored-By: Claude Opus 4.6 --- config/settings.py | 4 ++-- main.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/config/settings.py b/config/settings.py index 9aab261..7e6746d 100644 --- a/config/settings.py +++ b/config/settings.py @@ -48,7 +48,7 @@ class CloudRedisConfig: """云端 Redis 配置(三层权威模型 - 云端层)""" host: str = "localhost" port: int = 6379 - db: int = 0 + db: int = 1 password: Optional[str] = None decode_responses: bool = True max_connections: int = 20 @@ -227,7 +227,7 @@ class Settings: self.cloud_redis = CloudRedisConfig( host=os.getenv("CLOUD_REDIS_HOST", "localhost"), port=int(os.getenv("CLOUD_REDIS_PORT", "6379")), - db=int(os.getenv("CLOUD_REDIS_DB", "0")), + db=int(os.getenv("CLOUD_REDIS_DB", "1")), password=os.getenv("CLOUD_REDIS_PASSWORD"), ) diff --git a/main.py b/main.py index fea1fd7..ff8edaa 100644 --- a/main.py +++ b/main.py @@ -96,7 +96,7 @@ class EdgeInferenceService: try: self._config_manager = get_config_sync_manager() self._config_manager.start_config_subscription() - if self._settings.config_sync_mode == "LOCAL" and self._config_manager: + if self._config_manager: def _on_config_update(topic, data): if self._algorithm_manager: # 保留状态地更新参数,避免告警重复 From a124edf8f965f2d90f9a567af9db52a167a2673b Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sat, 28 Feb 2026 15:48:32 +0800 Subject: [PATCH 05/11] =?UTF-8?q?fix(edge):=20EDGE=5FDEVICE=5FID=E4=BB=8Ee?= =?UTF-8?q?dge-001=E6=94=B9=E4=B8=BAedge=EF=BC=8C=E4=B8=8EWVP=E6=A0=87?= =?UTF-8?q?=E5=87=86=E5=80=BC=E4=B8=80=E8=87=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Edge的config_sync过滤Stream事件时要求device_id匹配, WVP端标准device_id为"edge",此前不匹配导致Edge静默忽略 所有配置推送事件。 Co-Authored-By: Claude Opus 4.6 --- .env | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 0000000..796994d --- /dev/null +++ b/.env @@ -0,0 +1,23 @@ +# Local debug overrides +DEBUG_SERVER_HOST=0.0.0.0 + +# Alarm upload - cloud API URL (Service backend) +CLOUD_API_URL=http://localhost:8000 + +# ===== 边缘设备 ID(必须与 WVP 数据库 ROI 表的 device_id 一致) ===== +EDGE_DEVICE_ID=edge + +# ===== 配置同步模式(REDIS=监听云端Stream, LOCAL=仅本地SQLite) ===== +CONFIG_SYNC_MODE=REDIS + +# ===== 云端 Redis(截图处理器 + 配置同步,db=1) ===== +CLOUD_REDIS_HOST=sh-crs-6upea3zn.sql.tencentcdb.com +CLOUD_REDIS_PORT=24637 +CLOUD_REDIS_DB=1 +CLOUD_REDIS_PASSWORD=HkVZkVnn1 + +# ===== 腾讯云 COS(截图上传) ===== +COS_REGION=ap-shanghai +COS_BUCKET=xhwkzx-1-1389966313 +COS_SECRET_ID=AKIDVxPiqmVhYv7FCwVqytdAVddQ2TJySt9I +COS_SECRET_KEY=1rVyEI8mMVWs21xfBUjy4BE6DA4z7KWb From b2469c576c213a56f79b7efaa574135379db3da4 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sat, 28 Feb 2026 17:05:49 +0800 Subject: [PATCH 06/11] =?UTF-8?q?fix:=20=E7=A7=BB=E9=99=A4=20config=5Fsync?= =?UTF-8?q?=20=E6=A8=A1=E5=9D=97=E7=9A=84=E6=97=A5=E5=BF=97=E6=8A=91?= =?UTF-8?q?=E5=88=B6=EF=BC=8C=E6=81=A2=E5=A4=8D=E9=85=8D=E7=BD=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E6=97=A5=E5=BF=97=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit core.config_sync 被错误地加入 _QUIET_LOGGERS 列表,导致所有 INFO 级别 日志被抑制,无法看到 Redis 连接、Stream 监听、配置同步等关键运行日志。 Co-Authored-By: Claude Opus 4.6 --- utils/logger.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/utils/logger.py b/utils/logger.py index e8faa00..d178292 100644 --- a/utils/logger.py +++ b/utils/logger.py @@ -100,8 +100,6 @@ class StructuredLogger: # TensorRT 推理引擎 "core.tensorrt_engine", "tensorrt", - # 配置同步 - "core.config_sync", # 数据库 "config.database", # 算法管理器(注册、配置订阅) From 3ca6c934791dcbfc7de244f380ec67d9a1b213d5 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sun, 1 Mar 2026 19:58:58 +0800 Subject: [PATCH 07/11] =?UTF-8?q?fix(edge):=20=E9=85=8D=E7=BD=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=90=8E=E6=B8=85=E7=90=86=20SQLite=20=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E6=97=A7=E6=91=84=E5=83=8F=E5=A4=B4/ROI/=E7=BB=91?= =?UTF-8?q?=E5=AE=9A=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _sync_config_to_sqlite 增加旧数据清理逻辑 - 同步完成后删除不在本次推送列表中的摄像头、ROI 及关联算法绑定 - 仅在推送列表非空时执行清理,防止空配置误删所有数据 - 解决旧摄像头残留导致 Edge 加载过期配置的问题 Co-Authored-By: Claude Opus 4.6 --- core/config_sync.py | 63 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/core/config_sync.py b/core/config_sync.py index d2e4f0c..7bad3e8 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -528,10 +528,14 @@ class ConfigSyncManager: try: # 同步摄像头配置 cameras = config_data.get("cameras", []) + incoming_camera_ids = set() for cam in cameras: + cam_id = cam.get("camera_id", "") + if cam_id: + incoming_camera_ids.add(cam_id) try: self._db_manager.save_camera_config( - camera_id=cam.get("camera_id", ""), + camera_id=cam_id, rtsp_url=cam.get("rtsp_url", ""), camera_name=cam.get("camera_name", ""), enabled=cam.get("enabled", True), @@ -543,13 +547,16 @@ class ConfigSyncManager: # 同步 ROI 配置 rois = config_data.get("rois", []) + incoming_roi_ids = set() for roi in rois: if not isinstance(roi, dict): - logger.error(f"?? ROI ????: invalid roi item type={type(roi)}") + logger.error(f"无效 ROI 数据: invalid roi item type={type(roi)}") continue + roi_id = roi.get("roi_id", "") + if roi_id: + incoming_roi_ids.add(roi_id) try: coordinates = roi.get("coordinates", []) - # ?? rectangle dict ? polygon list-of-dict ?? if isinstance(coordinates, dict): coordinates = { "x": coordinates.get("x", 0), @@ -561,7 +568,7 @@ class ConfigSyncManager: coordinates = [[p.get("x", 0), p.get("y", 0)] for p in coordinates] self._db_manager.save_roi_config( - roi_id=roi.get("roi_id", ""), + roi_id=roi_id, camera_id=roi.get("camera_id", ""), roi_type=roi.get("roi_type", "polygon"), coordinates=coordinates, @@ -574,10 +581,14 @@ class ConfigSyncManager: # 同步算法绑定 binds = config_data.get("binds", []) + incoming_bind_ids = set() for bind in binds: + bind_id = bind.get("bind_id", "") + if bind_id: + incoming_bind_ids.add(bind_id) try: self._db_manager.save_roi_algo_bind( - bind_id=bind.get("bind_id", ""), + bind_id=bind_id, roi_id=bind.get("roi_id", ""), algo_code=bind.get("algo_code", ""), params=bind.get("params", {}), @@ -590,9 +601,51 @@ class ConfigSyncManager: logger.info(f"配置同步到 SQLite 完成: {count} 条记录") + # 清理 SQLite 中不在本次推送列表中的旧数据 + self._cleanup_stale_records(incoming_camera_ids, incoming_roi_ids, incoming_bind_ids) + except Exception as e: logger.error(f"配置同步到 SQLite 失败: {e}") + def _cleanup_stale_records(self, incoming_camera_ids: set, incoming_roi_ids: set, incoming_bind_ids: set): + """清理 SQLite 中不在本次推送列表中的旧记录""" + if not self._db_manager: + return + + # 仅在推送列表非空时执行清理,防止空配置误删所有数据 + if not incoming_camera_ids and not incoming_roi_ids: + logger.debug("推送列表为空,跳过旧数据清理") + return + + removed = 0 + try: + # 清理旧摄像头 + if incoming_camera_ids: + existing_cameras = self._db_manager.get_all_camera_configs() + for cam in existing_cameras: + old_id = cam.get("camera_id", "") + if old_id and old_id not in incoming_camera_ids: + self._db_manager.delete_camera_config(old_id) + removed += 1 + logger.info(f"清理旧摄像头: {old_id}") + + # 清理旧 ROI 及其关联的算法绑定 + if incoming_roi_ids: + existing_rois = self._db_manager.get_all_roi_configs() + for roi in existing_rois: + old_roi_id = roi.get("roi_id", "") + if old_roi_id and old_roi_id not in incoming_roi_ids: + bind_count = self._db_manager.delete_bindings_by_roi(old_roi_id) + self._db_manager.delete_roi_config(old_roi_id) + removed += 1 + bind_count + logger.info(f"清理旧 ROI: {old_roi_id} (含 {bind_count} 条算法绑定)") + + if removed > 0: + logger.info(f"旧数据清理完成: 共删除 {removed} 条过期记录") + + except Exception as e: + logger.error(f"清理旧数据失败: {e}") + # ==================== 回调管理 ==================== def register_callback(self, topic: str, callback: Callable): From d9d58dfafaefce669482360cdc24f622174114d0 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 2 Mar 2026 09:39:14 +0800 Subject: [PATCH 08/11] =?UTF-8?q?feat(edge):=20=E6=88=AA=E5=9B=BE=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E4=B8=B4=E6=97=B6=20RTSP=20=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=EF=BC=8C=E8=A7=A3=E5=86=B3=E6=97=A0=20ROI=20=E6=91=84=E5=83=8F?= =?UTF-8?q?=E5=A4=B4=E6=97=A0=E6=B3=95=E6=88=AA=E5=9B=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _capture_frame 增加 rtsp_url 参数,优先走已有流,无流时降级 临时连接 RTSP 抓帧(_capture_ondemand),用完即释放。 提取 _encode_jpeg 公共方法。 Co-Authored-By: Claude Opus 4.6 --- core/screenshot_handler.py | 76 ++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index 238a435..92421ea 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -181,11 +181,13 @@ class ScreenshotHandler: }) return + rtsp_url = fields.get("rtsp_url", "") + logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s", request_id, camera_code, callback_url or "(无)") # 1. 抓帧 - jpeg_bytes = self._capture_frame(camera_code) + jpeg_bytes = self._capture_frame(camera_code, rtsp_url) if jpeg_bytes is None: self._send_result(callback_url, request_id, camera_code, { "status": "error", @@ -217,36 +219,62 @@ class ScreenshotHandler: # ==================== 抓帧 ==================== - def _capture_frame(self, camera_code: str) -> Optional[bytes]: - """从 MultiStreamManager 获取最新帧并编码为 JPEG""" + def _capture_frame(self, camera_code: str, rtsp_url: str = "") -> Optional[bytes]: + """从 MultiStreamManager 获取最新帧,无流时降级临时 RTSP 连接""" + # 优先从已有视频流获取(有 ROI 的摄像头,已连接) stream = self._stream_manager.get_stream(camera_code) - if stream is None: - logger.warning("[截图] 未找到摄像头流: %s", camera_code) - return None + if stream is not None and stream.is_connected: + frame = stream.get_latest_frame(timeout=2.0) + if frame is None: + frame = stream.read(timeout=2.0) + if frame is not None: + return self._encode_jpeg(frame.image) - if not stream.is_connected: - logger.warning("[截图] 摄像头未连接: %s", camera_code) - return None + # 降级:临时 RTSP 连接抓帧(无 ROI 或流未连接的摄像头) + if rtsp_url: + return self._capture_ondemand(camera_code, rtsp_url) - frame = stream.get_latest_frame(timeout=2.0) - if frame is None: - # 回退:直接从缓冲区读一帧 - frame = stream.read(timeout=2.0) + logger.warning("[截图] 未找到摄像头流且无 rtsp_url: %s", camera_code) + return None - if frame is None: - logger.warning("[截图] 获取帧超时: %s", camera_code) - return None - - # JPEG 编码 + def _capture_ondemand(self, camera_code: str, rtsp_url: str) -> Optional[bytes]: + """临时连接 RTSP 抓取一帧,用完即断""" + cap = None try: - encode_params = [cv2.IMWRITE_JPEG_QUALITY, 85] - success, buffer = cv2.imencode(".jpg", frame.image, encode_params) - if not success: - logger.error("[截图] JPEG 编码失败: %s", camera_code) + logger.info("[截图] 临时连接 RTSP: %s → %s", camera_code, rtsp_url) + cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + if not cap.isOpened(): + logger.warning("[截图] 临时 RTSP 连接失败: %s", camera_code) return None - return buffer.tobytes() + + # 读取几帧丢弃(跳过关键帧解码延迟),取最后一帧 + frame = None + for _ in range(5): + ret, f = cap.read() + if ret and f is not None: + frame = f + + if frame is None: + logger.warning("[截图] 临时连接读帧失败: %s", camera_code) + return None + + return self._encode_jpeg(frame) except Exception as e: - logger.error("[截图] 帧编码异常: %s", e) + logger.error("[截图] 临时截图异常: %s, %s", camera_code, e) + return None + finally: + if cap is not None: + cap.release() + logger.debug("[截图] 临时 RTSP 连接已释放: %s", camera_code) + + def _encode_jpeg(self, image) -> Optional[bytes]: + """将帧编码为 JPEG 字节""" + try: + success, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, 85]) + return buffer.tobytes() if success else None + except Exception as e: + logger.error("[截图] JPEG 编码失败: %s", e) return None # ==================== COS 上传 ==================== From d71d5da7401121fc5df79616f2899c47cfdf32d4 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 2 Mar 2026 09:58:34 +0800 Subject: [PATCH 09/11] =?UTF-8?q?fix(edge):=20=E5=91=8A=E8=AD=A6=E4=B8=8A?= =?UTF-8?q?=E6=8A=A5=E6=94=B9=E6=8C=87=20WVP=20=E7=AB=AF=E7=82=B9=EF=BC=8C?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=BF=9E=E6=8E=A5=E5=A4=B1=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CLOUD_API_URL 改为 WVP 地址 (http://124.221.55.225:18080) - 告警路径从 /admin-api/aiot/alarm/edge/* 改为 /api/ai/alert/edge/* - 适配 WVP 新增的 report/resolve 端点 Co-Authored-By: Claude Opus 4.6 --- .env | 4 ++-- core/alarm_upload_worker.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.env b/.env index 796994d..5717f51 100644 --- a/.env +++ b/.env @@ -1,8 +1,8 @@ # Local debug overrides DEBUG_SERVER_HOST=0.0.0.0 -# Alarm upload - cloud API URL (Service backend) -CLOUD_API_URL=http://localhost:8000 +# Alarm upload - cloud API URL (WVP backend) +CLOUD_API_URL=http://124.221.55.225:18080 # ===== 边缘设备 ID(必须与 WVP 数据库 ROI 表的 device_id 一致) ===== EDGE_DEVICE_ID=edge diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index 66c4016..3156a17 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -100,7 +100,7 @@ class AlarmUploadWorker: upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") health_url = f"{base_url}/health" - report_url = f"{base_url}/admin-api/aiot/alarm/edge/report" + report_url = f"{base_url}/api/ai/alert/edge/report" self._logger.info(f"云端 API 地址: {base_url}") self._logger.info(f"告警上报端点: {report_url}") @@ -250,7 +250,7 @@ class AlarmUploadWorker: """处理告警结束事件 - HTTP POST 到云端""" upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") - url = f"{base_url}/admin-api/aiot/alarm/edge/resolve" + url = f"{base_url}/api/ai/alert/edge/resolve" headers = {"Content-Type": "application/json"} if upload_cfg.edge_token: @@ -342,7 +342,7 @@ class AlarmUploadWorker: """ upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") - url = f"{base_url}/admin-api/aiot/alarm/edge/report" + url = f"{base_url}/api/ai/alert/edge/report" headers = { "Content-Type": "application/json", From d9f78e0b48e936065e1fcdf070742ef9379e58dd Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 2 Mar 2026 14:03:12 +0800 Subject: [PATCH 10/11] =?UTF-8?q?refactor(edge):=20=E6=88=AA=E5=9B=BE?= =?UTF-8?q?=E4=B8=8D=E5=86=8D=E4=BF=9D=E5=AD=98=E6=9C=AC=E5=9C=B0=EF=BC=8C?= =?UTF-8?q?=E7=9B=B4=E6=8E=A5=E7=BC=96=E7=A0=81=E4=B8=BAbase64=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0COS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ResultReporter: 截图通过 cv2.imencode 编码为 JPEG base64, 直接放入 Redis 消息,不再调用 ImageStorageManager 保存本地文件 - AlarmUploadWorker: 从 Redis 读取 base64 解码为字节流, 使用 put_object(Body=bytes) 直接上传 COS,移除 local: 回退逻辑 - 移除 AlarmInfo.snapshot_local_path,改为 snapshot_b64 - COS 未配置时返回 None 进入重试(不再静默回退本地路径) Co-Authored-By: Claude Opus 4.6 --- core/alarm_upload_worker.py | 58 +++++++++++++------------------------ core/result_reporter.py | 42 +++++++++------------------ 2 files changed, 33 insertions(+), 67 deletions(-) diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index 3156a17..2637d16 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -15,7 +15,6 @@ Redis Key 设计: import json import logging -import os import threading import time from datetime import datetime, timezone @@ -183,21 +182,16 @@ class AlarmUploadWorker: self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})") - # Step 1: 上传截图到 COS - snapshot_local_path = alarm_data.get("snapshot_local_path") + # Step 1: 上传截图到 COS(从 base64 解码后直接上传字节流) + snapshot_b64 = alarm_data.get("snapshot_b64") object_key = None - if snapshot_local_path: - # 截图是异步保存的,等待文件写入完成(最多 3 秒) - if not os.path.exists(snapshot_local_path): - for _ in range(6): - time.sleep(0.5) - if os.path.exists(snapshot_local_path): - break - - if os.path.exists(snapshot_local_path): + if snapshot_b64: + try: + import base64 + image_bytes = base64.b64decode(snapshot_b64) object_key = self._upload_snapshot_to_cos( - snapshot_local_path, + image_bytes, alarm_id, alarm_data.get("device_id", "unknown"), ) @@ -205,17 +199,12 @@ class AlarmUploadWorker: # COS 上传失败,进入重试 self._handle_retry(alarm_json, "COS 上传失败") return - elif object_key == "": - # COS 未配置,使用本地截图路径作为回退 - captures_base = os.path.join("data", "captures") - rel_path = os.path.relpath(snapshot_local_path, captures_base) - rel_path = rel_path.replace("\\", "/") - object_key = f"local:{rel_path}" - self._logger.info(f"使用本地截图路径: {object_key}") - else: - self._logger.warning(f"截图文件不存在: {snapshot_local_path}") + except Exception as e: + self._logger.error(f"截图解码/上传失败: {e}") + self._handle_retry(alarm_json, f"截图处理失败: {e}") + return - # Step 2: HTTP 上报告警元数据 + # Step 2: HTTP 上报告警元数据(不含截图二进制数据) report_data = { "alarm_id": alarm_data.get("alarm_id"), "alarm_type": alarm_data.get("alarm_type"), @@ -234,14 +223,6 @@ class AlarmUploadWorker: if success: self._stats["processed"] += 1 self._logger.info(f"告警上报成功: {alarm_id}") - - # 仅在 COS 上传成功时删除本地截图;本地回退模式(local:)不删除 - if snapshot_local_path and os.path.exists(snapshot_local_path) and object_key and not object_key.startswith("local:"): - try: - os.remove(snapshot_local_path) - self._logger.debug(f"已删除本地截图: {snapshot_local_path}") - except Exception as e: - self._logger.warning(f"删除本地截图失败: {e}") else: # HTTP 上报失败,进入重试 self._handle_retry(alarm_json, "HTTP 上报失败") @@ -277,13 +258,13 @@ class AlarmUploadWorker: self._logger.warning(f"告警结束上报异常: {e}") def _upload_snapshot_to_cos( - self, local_path: str, alarm_id: str, device_id: str + self, image_bytes: bytes, alarm_id: str, device_id: str ) -> Optional[str]: """ - 上传截图到腾讯云 COS + 上传截图到腾讯云 COS(直接从内存字节流上传) Args: - local_path: 本地截图路径 + image_bytes: JPEG 图片字节 alarm_id: 告警ID device_id: 设备ID @@ -292,8 +273,8 @@ class AlarmUploadWorker: """ cos_cfg = self._settings.cos if not cos_cfg.secret_id or not cos_cfg.bucket: - self._logger.warning("COS 未配置,跳过截图上传") - return "" + self._logger.error("COS 未配置(缺少 secret_id 或 bucket),无法上传截图") + return None # 懒初始化 COS 客户端 if self._cos_client is None: @@ -317,10 +298,11 @@ class AlarmUploadWorker: object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg" try: - self._cos_client.put_object_from_local_file( + self._cos_client.put_object( Bucket=cos_cfg.bucket, - LocalFilePath=local_path, + Body=image_bytes, Key=object_key, + ContentType="image/jpeg", ) self._stats["cos_uploaded"] += 1 self._logger.info(f"COS 上传成功: {object_key}") diff --git a/core/result_reporter.py b/core/result_reporter.py index 53a3e53..c2af8e3 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -30,7 +30,7 @@ class AlarmInfo: scene_id: str event_time: str # ISO8601 alarm_level: int # 1-4 - snapshot_local_path: Optional[str] = None + snapshot_b64: Optional[str] = None # Base64 编码的 JPEG 截图 algorithm_code: Optional[str] = None confidence_score: Optional[float] = None ext_data: Optional[Dict[str, Any]] = field(default_factory=dict) @@ -44,7 +44,7 @@ class AlarmInfo: "scene_id": self.scene_id, "event_time": self.event_time, "alarm_level": self.alarm_level, - "snapshot_local_path": self.snapshot_local_path, + "snapshot_b64": self.snapshot_b64, "algorithm_code": self.algorithm_code, "confidence_score": self.confidence_score, "ext_data": self.ext_data, @@ -85,21 +85,12 @@ class ResultReporter: "queue_failures": 0, } - # 图片存储(本地保存截图供 worker 读取上传) - self._image_storage = None self._db_manager = None self._logger.info("ResultReporter 初始化完成(Redis 缓冲模式)") def initialize(self): - """初始化 Redis 连接和本地存储""" - # 初始化本地存储(截图保存) - try: - from core.storage_manager import get_image_storage - self._image_storage = get_image_storage() - except Exception as e: - self._logger.warning(f"本地图片存储初始化失败: {e}") - + """初始化 Redis 连接""" try: from config.database import get_sqlite_manager self._db_manager = get_sqlite_manager() @@ -138,19 +129,18 @@ class ResultReporter: """ self._performance_stats["alerts_generated"] += 1 - # 保存截图到本地,获取本地路径 - if screenshot is not None and self._image_storage: + # 将截图编码为 JPEG base64,直接通过 Redis 传递给 Worker 上传 COS + if screenshot is not None: try: - local_path = self._image_storage.save_capture( - image=screenshot, - camera_id=alarm_info.device_id, - alert_id=alarm_info.alarm_id, - timestamp=datetime.now(), - ) - if local_path: - alarm_info.snapshot_local_path = local_path + import cv2 + import base64 + success, buffer = cv2.imencode('.jpg', screenshot, [cv2.IMWRITE_JPEG_QUALITY, 85]) + if success: + alarm_info.snapshot_b64 = base64.b64encode(buffer.tobytes()).decode('ascii') + else: + self._logger.warning("截图 JPEG 编码失败") except Exception as e: - self._logger.error(f"保存截图失败: {e}") + self._logger.error(f"编码截图失败: {e}") # 写入 Redis 队列 if self._redis is None: @@ -209,12 +199,6 @@ class ResultReporter: """关闭上报器""" self._logger.info("ResultReporter 资源清理") - if self._image_storage: - try: - self._image_storage.close() - except Exception: - pass - if self._db_manager: try: self._db_manager.close() From bdbfca42524676d2d347e73e6f4f69aefccb936f Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 2 Mar 2026 16:49:20 +0800 Subject: [PATCH 11/11] =?UTF-8?q?fix(edge):=20=E4=BF=AE=E5=A4=8D=20batch?= =?UTF-8?q?=20=E6=8E=A8=E7=90=86=E8=B6=85=E8=BF=87=20MAX=5FBATCH=5FSIZE=20?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E7=BC=93=E5=86=B2=E5=8C=BA=E6=BA=A2=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 队列中 ROI 数量超过 8 时(多摄像头多 ROI 绑定场景), 一次性送入 TensorRT 引擎导致 np.copyto 溢出。 改为按 max_batch_size 分块推理。 Co-Authored-By: Claude Opus 4.6 --- main.py | 88 ++++++++++++++++++++++++++++++--------------------------- 1 file changed, 46 insertions(+), 42 deletions(-) diff --git a/main.py b/main.py index ff8edaa..149f1e3 100644 --- a/main.py +++ b/main.py @@ -562,59 +562,63 @@ class EdgeInferenceService: self._logger.error(f"处理帧失败 {camera_id}: {e}") def _batch_process_rois(self): - """批量处理 ROI - 真正的 batch 推理""" + """批量处理 ROI - 真正的 batch 推理(按 max_batch_size 分块)""" with self._batch_lock: roi_items = self._batch_roi_queue if not roi_items: return self._batch_roi_queue = [] - try: - images = [item[4] for item in roi_items] - scale_infos = [item[5] for item in roi_items] + engine = self._engine_manager.get_engine("default") + if engine is None: + return - # 真正的 batch: 将所有 ROI 裁剪图拼成 [N,3,H,W] 一次推理 - batch_data, _ = self._preprocessor._batch_preprocessor.preprocess_batch( - images - ) + # 按 max_batch_size 分块处理 + for chunk_start in range(0, len(roi_items), self._max_batch_size): + chunk = roi_items[chunk_start:chunk_start + self._max_batch_size] - engine = self._engine_manager.get_engine("default") - if engine is None: - return + try: + images = [item[4] for item in chunk] + scale_infos = [item[5] for item in chunk] - # 一次性推理整个 batch - outputs, inference_time_ms = engine.infer(batch_data) - - # 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别) - import numpy as np - if isinstance(outputs, np.ndarray): - self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms") - elif isinstance(outputs, (list, tuple)): - shapes = [o.shape if hasattr(o, 'shape') else type(o) for o in outputs] - self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms") - - batch_size = len(roi_items) - batch_results = self._postprocessor.batch_process_detections( - outputs, - batch_size, - conf_threshold=self._settings.inference.conf_threshold - ) - - total_detections = sum(len(r[0]) for r in batch_results) - self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}") - - for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(roi_items): - boxes, scores, class_ids = batch_results[idx] - - # 无论是否检测到目标都要调用算法(离岗检测需要"无人"信号) - self._handle_detections( - camera_id, roi, bind, frame, - boxes, scores, class_ids, - scale_info + # 真正的 batch: 将所有 ROI 裁剪图拼成 [N,3,H,W] 一次推理 + batch_data, _ = self._preprocessor._batch_preprocessor.preprocess_batch( + images ) - except Exception as e: - self._logger.error(f"批量处理 ROI 失败: {e}") + # 一次性推理整个 batch + outputs, inference_time_ms = engine.infer(batch_data) + + # 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别) + import numpy as np + if isinstance(outputs, np.ndarray): + self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms") + elif isinstance(outputs, (list, tuple)): + shapes = [o.shape if hasattr(o, 'shape') else type(o) for o in outputs] + self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms") + + batch_size = len(chunk) + batch_results = self._postprocessor.batch_process_detections( + outputs, + batch_size, + conf_threshold=self._settings.inference.conf_threshold + ) + + total_detections = sum(len(r[0]) for r in batch_results) + self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}") + + for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(chunk): + boxes, scores, class_ids = batch_results[idx] + + # 无论是否检测到目标都要调用算法(离岗检测需要"无人"信号) + self._handle_detections( + camera_id, roi, bind, frame, + boxes, scores, class_ids, + scale_info + ) + + except Exception as e: + self._logger.error(f"批量处理 ROI 失败: {e}") def _process_roi_frame(self, camera_id: str, frame: VideoFrame, roi): """收集 ROI 帧数据 - 已集成到 _process_frame 中"""