diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index 3156a17..2637d16 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -15,7 +15,6 @@ Redis Key 设计: import json import logging -import os import threading import time from datetime import datetime, timezone @@ -183,21 +182,16 @@ class AlarmUploadWorker: self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})") - # Step 1: 上传截图到 COS - snapshot_local_path = alarm_data.get("snapshot_local_path") + # Step 1: 上传截图到 COS(从 base64 解码后直接上传字节流) + snapshot_b64 = alarm_data.get("snapshot_b64") object_key = None - if snapshot_local_path: - # 截图是异步保存的,等待文件写入完成(最多 3 秒) - if not os.path.exists(snapshot_local_path): - for _ in range(6): - time.sleep(0.5) - if os.path.exists(snapshot_local_path): - break - - if os.path.exists(snapshot_local_path): + if snapshot_b64: + try: + import base64 + image_bytes = base64.b64decode(snapshot_b64) object_key = self._upload_snapshot_to_cos( - snapshot_local_path, + image_bytes, alarm_id, alarm_data.get("device_id", "unknown"), ) @@ -205,17 +199,12 @@ class AlarmUploadWorker: # COS 上传失败,进入重试 self._handle_retry(alarm_json, "COS 上传失败") return - elif object_key == "": - # COS 未配置,使用本地截图路径作为回退 - captures_base = os.path.join("data", "captures") - rel_path = os.path.relpath(snapshot_local_path, captures_base) - rel_path = rel_path.replace("\\", "/") - object_key = f"local:{rel_path}" - self._logger.info(f"使用本地截图路径: {object_key}") - else: - self._logger.warning(f"截图文件不存在: {snapshot_local_path}") + except Exception as e: + self._logger.error(f"截图解码/上传失败: {e}") + self._handle_retry(alarm_json, f"截图处理失败: {e}") + return - # Step 2: HTTP 上报告警元数据 + # Step 2: HTTP 上报告警元数据(不含截图二进制数据) report_data = { "alarm_id": alarm_data.get("alarm_id"), "alarm_type": alarm_data.get("alarm_type"), @@ -234,14 +223,6 @@ class AlarmUploadWorker: if success: self._stats["processed"] += 1 self._logger.info(f"告警上报成功: {alarm_id}") - - # 仅在 COS 上传成功时删除本地截图;本地回退模式(local:)不删除 - if snapshot_local_path and os.path.exists(snapshot_local_path) and object_key and not object_key.startswith("local:"): - try: - os.remove(snapshot_local_path) - self._logger.debug(f"已删除本地截图: {snapshot_local_path}") - except Exception as e: - self._logger.warning(f"删除本地截图失败: {e}") else: # HTTP 上报失败,进入重试 self._handle_retry(alarm_json, "HTTP 上报失败") @@ -277,13 +258,13 @@ class AlarmUploadWorker: self._logger.warning(f"告警结束上报异常: {e}") def _upload_snapshot_to_cos( - self, local_path: str, alarm_id: str, device_id: str + self, image_bytes: bytes, alarm_id: str, device_id: str ) -> Optional[str]: """ - 上传截图到腾讯云 COS + 上传截图到腾讯云 COS(直接从内存字节流上传) Args: - local_path: 本地截图路径 + image_bytes: JPEG 图片字节 alarm_id: 告警ID device_id: 设备ID @@ -292,8 +273,8 @@ class AlarmUploadWorker: """ cos_cfg = self._settings.cos if not cos_cfg.secret_id or not cos_cfg.bucket: - self._logger.warning("COS 未配置,跳过截图上传") - return "" + self._logger.error("COS 未配置(缺少 secret_id 或 bucket),无法上传截图") + return None # 懒初始化 COS 客户端 if self._cos_client is None: @@ -317,10 +298,11 @@ class AlarmUploadWorker: object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg" try: - self._cos_client.put_object_from_local_file( + self._cos_client.put_object( Bucket=cos_cfg.bucket, - LocalFilePath=local_path, + Body=image_bytes, Key=object_key, + ContentType="image/jpeg", ) self._stats["cos_uploaded"] += 1 self._logger.info(f"COS 上传成功: {object_key}") diff --git a/core/result_reporter.py b/core/result_reporter.py index 53a3e53..c2af8e3 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -30,7 +30,7 @@ class AlarmInfo: scene_id: str event_time: str # ISO8601 alarm_level: int # 1-4 - snapshot_local_path: Optional[str] = None + snapshot_b64: Optional[str] = None # Base64 编码的 JPEG 截图 algorithm_code: Optional[str] = None confidence_score: Optional[float] = None ext_data: Optional[Dict[str, Any]] = field(default_factory=dict) @@ -44,7 +44,7 @@ class AlarmInfo: "scene_id": self.scene_id, "event_time": self.event_time, "alarm_level": self.alarm_level, - "snapshot_local_path": self.snapshot_local_path, + "snapshot_b64": self.snapshot_b64, "algorithm_code": self.algorithm_code, "confidence_score": self.confidence_score, "ext_data": self.ext_data, @@ -85,21 +85,12 @@ class ResultReporter: "queue_failures": 0, } - # 图片存储(本地保存截图供 worker 读取上传) - self._image_storage = None self._db_manager = None self._logger.info("ResultReporter 初始化完成(Redis 缓冲模式)") def initialize(self): - """初始化 Redis 连接和本地存储""" - # 初始化本地存储(截图保存) - try: - from core.storage_manager import get_image_storage - self._image_storage = get_image_storage() - except Exception as e: - self._logger.warning(f"本地图片存储初始化失败: {e}") - + """初始化 Redis 连接""" try: from config.database import get_sqlite_manager self._db_manager = get_sqlite_manager() @@ -138,19 +129,18 @@ class ResultReporter: """ self._performance_stats["alerts_generated"] += 1 - # 保存截图到本地,获取本地路径 - if screenshot is not None and self._image_storage: + # 将截图编码为 JPEG base64,直接通过 Redis 传递给 Worker 上传 COS + if screenshot is not None: try: - local_path = self._image_storage.save_capture( - image=screenshot, - camera_id=alarm_info.device_id, - alert_id=alarm_info.alarm_id, - timestamp=datetime.now(), - ) - if local_path: - alarm_info.snapshot_local_path = local_path + import cv2 + import base64 + success, buffer = cv2.imencode('.jpg', screenshot, [cv2.IMWRITE_JPEG_QUALITY, 85]) + if success: + alarm_info.snapshot_b64 = base64.b64encode(buffer.tobytes()).decode('ascii') + else: + self._logger.warning("截图 JPEG 编码失败") except Exception as e: - self._logger.error(f"保存截图失败: {e}") + self._logger.error(f"编码截图失败: {e}") # 写入 Redis 队列 if self._redis is None: @@ -209,12 +199,6 @@ class ResultReporter: """关闭上报器""" self._logger.info("ResultReporter 资源清理") - if self._image_storage: - try: - self._image_storage.close() - except Exception: - pass - if self._db_manager: try: self._db_manager.close()