diff --git a/.env b/.env new file mode 100644 index 0000000..5717f51 --- /dev/null +++ b/.env @@ -0,0 +1,23 @@ +# Local debug overrides +DEBUG_SERVER_HOST=0.0.0.0 + +# Alarm upload - cloud API URL (WVP backend) +CLOUD_API_URL=http://124.221.55.225:18080 + +# ===== 边缘设备 ID(必须与 WVP 数据库 ROI 表的 device_id 一致) ===== +EDGE_DEVICE_ID=edge + +# ===== 配置同步模式(REDIS=监听云端Stream, LOCAL=仅本地SQLite) ===== +CONFIG_SYNC_MODE=REDIS + +# ===== 云端 Redis(截图处理器 + 配置同步,db=1) ===== +CLOUD_REDIS_HOST=sh-crs-6upea3zn.sql.tencentcdb.com +CLOUD_REDIS_PORT=24637 +CLOUD_REDIS_DB=1 +CLOUD_REDIS_PASSWORD=HkVZkVnn1 + +# ===== 腾讯云 COS(截图上传) ===== +COS_REGION=ap-shanghai +COS_BUCKET=xhwkzx-1-1389966313 +COS_SECRET_ID=AKIDVxPiqmVhYv7FCwVqytdAVddQ2TJySt9I +COS_SECRET_KEY=1rVyEI8mMVWs21xfBUjy4BE6DA4z7KWb diff --git a/config/settings.py b/config/settings.py index 9aab261..7e6746d 100644 --- a/config/settings.py +++ b/config/settings.py @@ -48,7 +48,7 @@ class CloudRedisConfig: """云端 Redis 配置(三层权威模型 - 云端层)""" host: str = "localhost" port: int = 6379 - db: int = 0 + db: int = 1 password: Optional[str] = None decode_responses: bool = True max_connections: int = 20 @@ -227,7 +227,7 @@ class Settings: self.cloud_redis = CloudRedisConfig( host=os.getenv("CLOUD_REDIS_HOST", "localhost"), port=int(os.getenv("CLOUD_REDIS_PORT", "6379")), - db=int(os.getenv("CLOUD_REDIS_DB", "0")), + db=int(os.getenv("CLOUD_REDIS_DB", "1")), password=os.getenv("CLOUD_REDIS_PASSWORD"), ) diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index 66c4016..2637d16 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -15,7 +15,6 @@ Redis Key 设计: import json import logging -import os import threading import time from datetime import datetime, timezone @@ -100,7 +99,7 @@ class AlarmUploadWorker: upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") health_url = f"{base_url}/health" - report_url = f"{base_url}/admin-api/aiot/alarm/edge/report" + report_url = f"{base_url}/api/ai/alert/edge/report" self._logger.info(f"云端 API 地址: {base_url}") self._logger.info(f"告警上报端点: {report_url}") @@ -183,21 +182,16 @@ class AlarmUploadWorker: self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})") - # Step 1: 上传截图到 COS - snapshot_local_path = alarm_data.get("snapshot_local_path") + # Step 1: 上传截图到 COS(从 base64 解码后直接上传字节流) + snapshot_b64 = alarm_data.get("snapshot_b64") object_key = None - if snapshot_local_path: - # 截图是异步保存的,等待文件写入完成(最多 3 秒) - if not os.path.exists(snapshot_local_path): - for _ in range(6): - time.sleep(0.5) - if os.path.exists(snapshot_local_path): - break - - if os.path.exists(snapshot_local_path): + if snapshot_b64: + try: + import base64 + image_bytes = base64.b64decode(snapshot_b64) object_key = self._upload_snapshot_to_cos( - snapshot_local_path, + image_bytes, alarm_id, alarm_data.get("device_id", "unknown"), ) @@ -205,17 +199,12 @@ class AlarmUploadWorker: # COS 上传失败,进入重试 self._handle_retry(alarm_json, "COS 上传失败") return - elif object_key == "": - # COS 未配置,使用本地截图路径作为回退 - captures_base = os.path.join("data", "captures") - rel_path = os.path.relpath(snapshot_local_path, captures_base) - rel_path = rel_path.replace("\\", "/") - object_key = f"local:{rel_path}" - self._logger.info(f"使用本地截图路径: {object_key}") - else: - self._logger.warning(f"截图文件不存在: {snapshot_local_path}") + except Exception as e: + self._logger.error(f"截图解码/上传失败: {e}") + self._handle_retry(alarm_json, f"截图处理失败: {e}") + return - # Step 2: HTTP 上报告警元数据 + # Step 2: HTTP 上报告警元数据(不含截图二进制数据) report_data = { "alarm_id": alarm_data.get("alarm_id"), "alarm_type": alarm_data.get("alarm_type"), @@ -234,14 +223,6 @@ class AlarmUploadWorker: if success: self._stats["processed"] += 1 self._logger.info(f"告警上报成功: {alarm_id}") - - # 仅在 COS 上传成功时删除本地截图;本地回退模式(local:)不删除 - if snapshot_local_path and os.path.exists(snapshot_local_path) and object_key and not object_key.startswith("local:"): - try: - os.remove(snapshot_local_path) - self._logger.debug(f"已删除本地截图: {snapshot_local_path}") - except Exception as e: - self._logger.warning(f"删除本地截图失败: {e}") else: # HTTP 上报失败,进入重试 self._handle_retry(alarm_json, "HTTP 上报失败") @@ -250,7 +231,7 @@ class AlarmUploadWorker: """处理告警结束事件 - HTTP POST 到云端""" upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") - url = f"{base_url}/admin-api/aiot/alarm/edge/resolve" + url = f"{base_url}/api/ai/alert/edge/resolve" headers = {"Content-Type": "application/json"} if upload_cfg.edge_token: @@ -277,13 +258,13 @@ class AlarmUploadWorker: self._logger.warning(f"告警结束上报异常: {e}") def _upload_snapshot_to_cos( - self, local_path: str, alarm_id: str, device_id: str + self, image_bytes: bytes, alarm_id: str, device_id: str ) -> Optional[str]: """ - 上传截图到腾讯云 COS + 上传截图到腾讯云 COS(直接从内存字节流上传) Args: - local_path: 本地截图路径 + image_bytes: JPEG 图片字节 alarm_id: 告警ID device_id: 设备ID @@ -292,8 +273,8 @@ class AlarmUploadWorker: """ cos_cfg = self._settings.cos if not cos_cfg.secret_id or not cos_cfg.bucket: - self._logger.warning("COS 未配置,跳过截图上传") - return "" + self._logger.error("COS 未配置(缺少 secret_id 或 bucket),无法上传截图") + return None # 懒初始化 COS 客户端 if self._cos_client is None: @@ -317,10 +298,11 @@ class AlarmUploadWorker: object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg" try: - self._cos_client.put_object_from_local_file( + self._cos_client.put_object( Bucket=cos_cfg.bucket, - LocalFilePath=local_path, + Body=image_bytes, Key=object_key, + ContentType="image/jpeg", ) self._stats["cos_uploaded"] += 1 self._logger.info(f"COS 上传成功: {object_key}") @@ -342,7 +324,7 @@ class AlarmUploadWorker: """ upload_cfg = self._settings.alarm_upload base_url = upload_cfg.cloud_api_url.rstrip("/") - url = f"{base_url}/admin-api/aiot/alarm/edge/report" + url = f"{base_url}/api/ai/alert/edge/report" headers = { "Content-Type": "application/json", diff --git a/core/config_sync.py b/core/config_sync.py index d2e4f0c..7bad3e8 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -528,10 +528,14 @@ class ConfigSyncManager: try: # 同步摄像头配置 cameras = config_data.get("cameras", []) + incoming_camera_ids = set() for cam in cameras: + cam_id = cam.get("camera_id", "") + if cam_id: + incoming_camera_ids.add(cam_id) try: self._db_manager.save_camera_config( - camera_id=cam.get("camera_id", ""), + camera_id=cam_id, rtsp_url=cam.get("rtsp_url", ""), camera_name=cam.get("camera_name", ""), enabled=cam.get("enabled", True), @@ -543,13 +547,16 @@ class ConfigSyncManager: # 同步 ROI 配置 rois = config_data.get("rois", []) + incoming_roi_ids = set() for roi in rois: if not isinstance(roi, dict): - logger.error(f"?? ROI ????: invalid roi item type={type(roi)}") + logger.error(f"无效 ROI 数据: invalid roi item type={type(roi)}") continue + roi_id = roi.get("roi_id", "") + if roi_id: + incoming_roi_ids.add(roi_id) try: coordinates = roi.get("coordinates", []) - # ?? rectangle dict ? polygon list-of-dict ?? if isinstance(coordinates, dict): coordinates = { "x": coordinates.get("x", 0), @@ -561,7 +568,7 @@ class ConfigSyncManager: coordinates = [[p.get("x", 0), p.get("y", 0)] for p in coordinates] self._db_manager.save_roi_config( - roi_id=roi.get("roi_id", ""), + roi_id=roi_id, camera_id=roi.get("camera_id", ""), roi_type=roi.get("roi_type", "polygon"), coordinates=coordinates, @@ -574,10 +581,14 @@ class ConfigSyncManager: # 同步算法绑定 binds = config_data.get("binds", []) + incoming_bind_ids = set() for bind in binds: + bind_id = bind.get("bind_id", "") + if bind_id: + incoming_bind_ids.add(bind_id) try: self._db_manager.save_roi_algo_bind( - bind_id=bind.get("bind_id", ""), + bind_id=bind_id, roi_id=bind.get("roi_id", ""), algo_code=bind.get("algo_code", ""), params=bind.get("params", {}), @@ -590,9 +601,51 @@ class ConfigSyncManager: logger.info(f"配置同步到 SQLite 完成: {count} 条记录") + # 清理 SQLite 中不在本次推送列表中的旧数据 + self._cleanup_stale_records(incoming_camera_ids, incoming_roi_ids, incoming_bind_ids) + except Exception as e: logger.error(f"配置同步到 SQLite 失败: {e}") + def _cleanup_stale_records(self, incoming_camera_ids: set, incoming_roi_ids: set, incoming_bind_ids: set): + """清理 SQLite 中不在本次推送列表中的旧记录""" + if not self._db_manager: + return + + # 仅在推送列表非空时执行清理,防止空配置误删所有数据 + if not incoming_camera_ids and not incoming_roi_ids: + logger.debug("推送列表为空,跳过旧数据清理") + return + + removed = 0 + try: + # 清理旧摄像头 + if incoming_camera_ids: + existing_cameras = self._db_manager.get_all_camera_configs() + for cam in existing_cameras: + old_id = cam.get("camera_id", "") + if old_id and old_id not in incoming_camera_ids: + self._db_manager.delete_camera_config(old_id) + removed += 1 + logger.info(f"清理旧摄像头: {old_id}") + + # 清理旧 ROI 及其关联的算法绑定 + if incoming_roi_ids: + existing_rois = self._db_manager.get_all_roi_configs() + for roi in existing_rois: + old_roi_id = roi.get("roi_id", "") + if old_roi_id and old_roi_id not in incoming_roi_ids: + bind_count = self._db_manager.delete_bindings_by_roi(old_roi_id) + self._db_manager.delete_roi_config(old_roi_id) + removed += 1 + bind_count + logger.info(f"清理旧 ROI: {old_roi_id} (含 {bind_count} 条算法绑定)") + + if removed > 0: + logger.info(f"旧数据清理完成: 共删除 {removed} 条过期记录") + + except Exception as e: + logger.error(f"清理旧数据失败: {e}") + # ==================== 回调管理 ==================== def register_callback(self, topic: str, callback: Callable): diff --git a/core/result_reporter.py b/core/result_reporter.py index 53a3e53..c2af8e3 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -30,7 +30,7 @@ class AlarmInfo: scene_id: str event_time: str # ISO8601 alarm_level: int # 1-4 - snapshot_local_path: Optional[str] = None + snapshot_b64: Optional[str] = None # Base64 编码的 JPEG 截图 algorithm_code: Optional[str] = None confidence_score: Optional[float] = None ext_data: Optional[Dict[str, Any]] = field(default_factory=dict) @@ -44,7 +44,7 @@ class AlarmInfo: "scene_id": self.scene_id, "event_time": self.event_time, "alarm_level": self.alarm_level, - "snapshot_local_path": self.snapshot_local_path, + "snapshot_b64": self.snapshot_b64, "algorithm_code": self.algorithm_code, "confidence_score": self.confidence_score, "ext_data": self.ext_data, @@ -85,21 +85,12 @@ class ResultReporter: "queue_failures": 0, } - # 图片存储(本地保存截图供 worker 读取上传) - self._image_storage = None self._db_manager = None self._logger.info("ResultReporter 初始化完成(Redis 缓冲模式)") def initialize(self): - """初始化 Redis 连接和本地存储""" - # 初始化本地存储(截图保存) - try: - from core.storage_manager import get_image_storage - self._image_storage = get_image_storage() - except Exception as e: - self._logger.warning(f"本地图片存储初始化失败: {e}") - + """初始化 Redis 连接""" try: from config.database import get_sqlite_manager self._db_manager = get_sqlite_manager() @@ -138,19 +129,18 @@ class ResultReporter: """ self._performance_stats["alerts_generated"] += 1 - # 保存截图到本地,获取本地路径 - if screenshot is not None and self._image_storage: + # 将截图编码为 JPEG base64,直接通过 Redis 传递给 Worker 上传 COS + if screenshot is not None: try: - local_path = self._image_storage.save_capture( - image=screenshot, - camera_id=alarm_info.device_id, - alert_id=alarm_info.alarm_id, - timestamp=datetime.now(), - ) - if local_path: - alarm_info.snapshot_local_path = local_path + import cv2 + import base64 + success, buffer = cv2.imencode('.jpg', screenshot, [cv2.IMWRITE_JPEG_QUALITY, 85]) + if success: + alarm_info.snapshot_b64 = base64.b64encode(buffer.tobytes()).decode('ascii') + else: + self._logger.warning("截图 JPEG 编码失败") except Exception as e: - self._logger.error(f"保存截图失败: {e}") + self._logger.error(f"编码截图失败: {e}") # 写入 Redis 队列 if self._redis is None: @@ -209,12 +199,6 @@ class ResultReporter: """关闭上报器""" self._logger.info("ResultReporter 资源清理") - if self._image_storage: - try: - self._image_storage.close() - except Exception: - pass - if self._db_manager: try: self._db_manager.close() diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py new file mode 100644 index 0000000..92421ea --- /dev/null +++ b/core/screenshot_handler.py @@ -0,0 +1,348 @@ +""" +截图处理模块 - 监听云端 Redis Stream 截图请求,截图后上传 COS + +数据流: + WVP → XADD edge_snap_request → Edge 消费 → capture frame → upload COS → HTTP 回调 WVP + (回调失败时降级写 Redis snap:result:{id}) +""" + +import io +import json +import logging +import threading +import time +import uuid +from typing import Optional + +import cv2 +import numpy as np +import requests + +from config.settings import get_settings, COSConfig + +logger = logging.getLogger(__name__) + +# Redis 常量 +SNAP_REQUEST_STREAM = "edge_snap_request" +SNAP_CONSUMER_GROUP = "edge_snap_group" +SNAP_CONSUMER_NAME = "edge_snap_worker" +SNAP_RESULT_KEY_PREFIX = "snap:result:" +SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期 + +# HTTP 回调 +SNAP_CALLBACK_PATH = "/api/ai/roi/snap/callback" +SNAP_CALLBACK_TIMEOUT = 5 # 回调 HTTP 超时(秒) + + +class ScreenshotHandler: + """截图处理器 + + 从云端 Redis Stream 消费截图请求,通过 MultiStreamManager 获取最新帧, + JPEG 编码后直传 COS,最后通过 HTTP 回调 WVP 返回结果 URL。 + 回调失败时降级写 Redis。 + """ + + def __init__(self, cloud_redis, stream_manager): + """ + Args: + cloud_redis: 云端 Redis 客户端 (redis.Redis) + stream_manager: MultiStreamManager 实例 + """ + self._cloud_redis = cloud_redis + self._stream_manager = stream_manager + self._settings = get_settings() + self._cos_client = None + self._cos_config: COSConfig = self._settings.cos + + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + # ==================== 生命周期 ==================== + + def start(self): + """启动截图监听线程""" + if not self._cloud_redis: + logger.warning("[截图] 云端 Redis 不可用,截图处理器未启动") + return + if not self._cos_config.secret_id or not self._cos_config.bucket: + logger.warning("[截图] COS 配置不完整,截图处理器未启动") + return + + self._init_cos_client() + self._ensure_consumer_group() + + self._stop_event.clear() + self._thread = threading.Thread( + target=self._listen_loop, + name="ScreenshotHandler", + daemon=True, + ) + self._thread.start() + logger.info("[截图] 截图处理器已启动") + + def stop(self): + """停止截图监听线程""" + self._stop_event.set() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + logger.info("[截图] 截图处理器已停止") + + # ==================== COS 初始化 ==================== + + def _init_cos_client(self): + """初始化 COS 客户端""" + try: + from qcloud_cos import CosConfig, CosS3Client + + config = CosConfig( + Region=self._cos_config.region, + SecretId=self._cos_config.secret_id, + SecretKey=self._cos_config.secret_key, + ) + self._cos_client = CosS3Client(config) + logger.info("[截图] COS 客户端初始化成功: bucket=%s, region=%s", + self._cos_config.bucket, self._cos_config.region) + except Exception as e: + logger.error("[截图] COS 客户端初始化失败: %s", e) + + # ==================== Consumer Group ==================== + + def _ensure_consumer_group(self): + """确保 Redis Consumer Group 存在""" + try: + self._cloud_redis.xgroup_create( + SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, id="0", mkstream=True + ) + logger.info("[截图] 创建 consumer group: %s", SNAP_CONSUMER_GROUP) + except Exception as e: + # BUSYGROUP = group already exists, safe to ignore + if "BUSYGROUP" in str(e): + logger.debug("[截图] consumer group 已存在") + else: + logger.error("[截图] 创建 consumer group 失败: %s", e) + + # ==================== 主循环 ==================== + + def _listen_loop(self): + """XREADGROUP 循环消费截图请求""" + backoff = 5 + max_backoff = 60 + + while not self._stop_event.is_set(): + try: + results = self._cloud_redis.xreadgroup( + SNAP_CONSUMER_GROUP, + SNAP_CONSUMER_NAME, + {SNAP_REQUEST_STREAM: ">"}, + count=1, + block=5000, + ) + if not results: + continue + + backoff = 5 # 重置退避 + + for stream_name, messages in results: + for msg_id, fields in messages: + try: + self._handle_request(fields) + except Exception as e: + logger.error("[截图] 处理请求失败: %s", e) + finally: + # ACK 消息 + try: + self._cloud_redis.xack( + SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id + ) + except Exception: + pass + + except Exception as e: + if self._stop_event.is_set(): + return + logger.warning("[截图] 监听异常: %s, %ds 后重试", e, backoff) + self._stop_event.wait(backoff) + backoff = min(backoff * 2, max_backoff) + + # ==================== 请求处理 ==================== + + def _handle_request(self, fields: dict): + """处理单个截图请求""" + request_id = fields.get("request_id", "") + camera_code = fields.get("camera_code", "") + cos_path = fields.get("cos_path", "") + callback_url = fields.get("callback_url", "") + + if not request_id or not camera_code or not cos_path: + logger.warning("[截图] 请求字段不完整: %s", fields) + self._send_result(callback_url, request_id, camera_code, { + "status": "error", + "message": "请求字段不完整", + }) + return + + rtsp_url = fields.get("rtsp_url", "") + + logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s", + request_id, camera_code, callback_url or "(无)") + + # 1. 抓帧 + jpeg_bytes = self._capture_frame(camera_code, rtsp_url) + if jpeg_bytes is None: + self._send_result(callback_url, request_id, camera_code, { + "status": "error", + "message": f"摄像头流未连接: {camera_code}", + }) + return + + # 2. 上传 COS(失败重试 1 次) + url = self._upload_to_cos(jpeg_bytes, cos_path) + if url is None: + # 重试一次 + logger.info("[截图] COS 上传失败,重试一次...") + url = self._upload_to_cos(jpeg_bytes, cos_path) + + if url is None: + self._send_result(callback_url, request_id, camera_code, { + "status": "error", + "message": "COS 上传失败", + }) + return + + # 3. 发送结果 + self._send_result(callback_url, request_id, camera_code, { + "status": "ok", + "url": url, + }) + + logger.info("[截图] 截图完成: camera=%s, url=%s", camera_code, url) + + # ==================== 抓帧 ==================== + + def _capture_frame(self, camera_code: str, rtsp_url: str = "") -> Optional[bytes]: + """从 MultiStreamManager 获取最新帧,无流时降级临时 RTSP 连接""" + # 优先从已有视频流获取(有 ROI 的摄像头,已连接) + stream = self._stream_manager.get_stream(camera_code) + if stream is not None and stream.is_connected: + frame = stream.get_latest_frame(timeout=2.0) + if frame is None: + frame = stream.read(timeout=2.0) + if frame is not None: + return self._encode_jpeg(frame.image) + + # 降级:临时 RTSP 连接抓帧(无 ROI 或流未连接的摄像头) + if rtsp_url: + return self._capture_ondemand(camera_code, rtsp_url) + + logger.warning("[截图] 未找到摄像头流且无 rtsp_url: %s", camera_code) + return None + + def _capture_ondemand(self, camera_code: str, rtsp_url: str) -> Optional[bytes]: + """临时连接 RTSP 抓取一帧,用完即断""" + cap = None + try: + logger.info("[截图] 临时连接 RTSP: %s → %s", camera_code, rtsp_url) + cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + if not cap.isOpened(): + logger.warning("[截图] 临时 RTSP 连接失败: %s", camera_code) + return None + + # 读取几帧丢弃(跳过关键帧解码延迟),取最后一帧 + frame = None + for _ in range(5): + ret, f = cap.read() + if ret and f is not None: + frame = f + + if frame is None: + logger.warning("[截图] 临时连接读帧失败: %s", camera_code) + return None + + return self._encode_jpeg(frame) + except Exception as e: + logger.error("[截图] 临时截图异常: %s, %s", camera_code, e) + return None + finally: + if cap is not None: + cap.release() + logger.debug("[截图] 临时 RTSP 连接已释放: %s", camera_code) + + def _encode_jpeg(self, image) -> Optional[bytes]: + """将帧编码为 JPEG 字节""" + try: + success, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, 85]) + return buffer.tobytes() if success else None + except Exception as e: + logger.error("[截图] JPEG 编码失败: %s", e) + return None + + # ==================== COS 上传 ==================== + + # 预签名 URL 有效期(秒),必须大于 WVP 缓存 TTL(300s) + PRESIGNED_URL_EXPIRES = 3600 + + def _upload_to_cos(self, jpeg_bytes: bytes, cos_path: str) -> Optional[str]: + """上传 JPEG 到 COS,返回预签名访问 URL""" + if not self._cos_client: + logger.error("[截图] COS 客户端未初始化") + return None + + try: + body = io.BytesIO(jpeg_bytes) + self._cos_client.put_object( + Bucket=self._cos_config.bucket, + Body=body, + Key=cos_path, + ContentType="image/jpeg", + ) + # 生成预签名 URL(不附加额外 Params,保持 URL 简洁) + url = self._cos_client.get_presigned_url( + Method="GET", + Bucket=self._cos_config.bucket, + Key=cos_path, + Expired=self.PRESIGNED_URL_EXPIRES, + ) + return url + except Exception as e: + logger.error("[截图] COS 上传失败: %s", e) + return None + + # ==================== 结果发送(回调 + 降级) ==================== + + def _send_result(self, callback_url: str, request_id: str, camera_code: str, data: dict): + """发送截图结果:优先 HTTP 回调,失败则降级写 Redis""" + if not request_id: + return + + # 构造回调 body + body = { + "request_id": request_id, + "camera_code": camera_code, + **data, + } + + # 优先尝试 HTTP 回调 + if callback_url: + try: + url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH + resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT) + if resp.status_code < 300: + logger.info("[截图] HTTP 回调成功: request_id=%s", request_id) + return + else: + logger.warning("[截图] HTTP 回调返回 %d: request_id=%s", resp.status_code, request_id) + except Exception as e: + logger.warning("[截图] HTTP 回调失败: %s, 降级写 Redis", e) + + # 降级:写 Redis result key + self._write_result_redis(request_id, body) + + def _write_result_redis(self, request_id: str, data: dict): + """降级:将结果写入 Redis key(TTL 60s)""" + key = SNAP_RESULT_KEY_PREFIX + request_id + try: + self._cloud_redis.set(key, json.dumps(data), ex=SNAP_RESULT_TTL) + logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id) + except Exception as e: + logger.error("[截图] 降级写 Redis 也失败: %s", e) diff --git a/main.py b/main.py index 5047693..149f1e3 100644 --- a/main.py +++ b/main.py @@ -21,6 +21,7 @@ from core.tensorrt_engine import TensorRTEngine, EngineManager from core.postprocessor import PostProcessor from core.result_reporter import ResultReporter from core.alarm_upload_worker import AlarmUploadWorker +from core.screenshot_handler import ScreenshotHandler from algorithms import AlgorithmManager from utils.logger import get_logger, StructuredLogger from utils.version_control import get_version_control @@ -48,6 +49,7 @@ class EdgeInferenceService: self._postprocessor: Optional[PostProcessor] = None self._reporter: Optional[ResultReporter] = None self._alarm_worker: Optional[AlarmUploadWorker] = None + self._screenshot_handler: Optional[ScreenshotHandler] = None self._algorithm_manager: Optional[AlgorithmManager] = None self._debug_reload_thread: Optional[threading.Thread] = None self._debug_http_server = None @@ -94,7 +96,7 @@ class EdgeInferenceService: try: self._config_manager = get_config_sync_manager() self._config_manager.start_config_subscription() - if self._settings.config_sync_mode == "LOCAL" and self._config_manager: + if self._config_manager: def _on_config_update(topic, data): if self._algorithm_manager: # 保留状态地更新参数,避免告警重复 @@ -178,6 +180,45 @@ class EdgeInferenceService: self._logger.info("算法管理器初始化成功") except Exception as e: self._logger.error(f"算法管理器初始化失败: {e}") + + def _init_screenshot_handler(self): + """初始化截图处理器""" + try: + # 优先从 config_manager 获取已有的云端 Redis 连接 + cloud_redis = getattr(self._config_manager, '_cloud_redis', None) + + # LOCAL 模式下 config_manager 不初始化云端 Redis,需要独立创建 + if cloud_redis is None: + try: + import redis + cfg = self._settings.cloud_redis + cloud_redis = redis.Redis( + host=cfg.host, + port=cfg.port, + db=cfg.db, + password=cfg.password, + decode_responses=cfg.decode_responses, + socket_connect_timeout=5, + socket_timeout=5, + retry_on_timeout=True, + ) + cloud_redis.ping() + self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}") + except Exception as e: + self._logger.warning(f"截图处理器无法连接云端 Redis: {e}") + cloud_redis = None + + if cloud_redis and self._stream_manager: + self._screenshot_handler = ScreenshotHandler( + cloud_redis=cloud_redis, + stream_manager=self._stream_manager, + ) + self._screenshot_handler.start() + self._logger.info("截图处理器初始化成功") + else: + self._logger.warning("截图处理器跳过初始化(云端 Redis 不可达或流管理器不可用)") + except Exception as e: + self._logger.error(f"截图处理器初始化失败: {e}") def _start_debug_reload_watcher(self): """本地调试:监听文件触发同步""" @@ -257,6 +298,7 @@ class EdgeInferenceService: self._init_postprocessor() self._init_reporter() self._init_algorithm_manager() + self._init_screenshot_handler() self._start_debug_reload_watcher() self._start_debug_http_server() @@ -520,59 +562,63 @@ class EdgeInferenceService: self._logger.error(f"处理帧失败 {camera_id}: {e}") def _batch_process_rois(self): - """批量处理 ROI - 真正的 batch 推理""" + """批量处理 ROI - 真正的 batch 推理(按 max_batch_size 分块)""" with self._batch_lock: roi_items = self._batch_roi_queue if not roi_items: return self._batch_roi_queue = [] - try: - images = [item[4] for item in roi_items] - scale_infos = [item[5] for item in roi_items] + engine = self._engine_manager.get_engine("default") + if engine is None: + return - # 真正的 batch: 将所有 ROI 裁剪图拼成 [N,3,H,W] 一次推理 - batch_data, _ = self._preprocessor._batch_preprocessor.preprocess_batch( - images - ) + # 按 max_batch_size 分块处理 + for chunk_start in range(0, len(roi_items), self._max_batch_size): + chunk = roi_items[chunk_start:chunk_start + self._max_batch_size] - engine = self._engine_manager.get_engine("default") - if engine is None: - return + try: + images = [item[4] for item in chunk] + scale_infos = [item[5] for item in chunk] - # 一次性推理整个 batch - outputs, inference_time_ms = engine.infer(batch_data) - - # 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别) - import numpy as np - if isinstance(outputs, np.ndarray): - self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms") - elif isinstance(outputs, (list, tuple)): - shapes = [o.shape if hasattr(o, 'shape') else type(o) for o in outputs] - self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms") - - batch_size = len(roi_items) - batch_results = self._postprocessor.batch_process_detections( - outputs, - batch_size, - conf_threshold=self._settings.inference.conf_threshold - ) - - total_detections = sum(len(r[0]) for r in batch_results) - self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}") - - for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(roi_items): - boxes, scores, class_ids = batch_results[idx] - - # 无论是否检测到目标都要调用算法(离岗检测需要"无人"信号) - self._handle_detections( - camera_id, roi, bind, frame, - boxes, scores, class_ids, - scale_info + # 真正的 batch: 将所有 ROI 裁剪图拼成 [N,3,H,W] 一次推理 + batch_data, _ = self._preprocessor._batch_preprocessor.preprocess_batch( + images ) - except Exception as e: - self._logger.error(f"批量处理 ROI 失败: {e}") + # 一次性推理整个 batch + outputs, inference_time_ms = engine.infer(batch_data) + + # 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别) + import numpy as np + if isinstance(outputs, np.ndarray): + self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, output shape={outputs.shape}, 耗时={inference_time_ms:.1f}ms") + elif isinstance(outputs, (list, tuple)): + shapes = [o.shape if hasattr(o, 'shape') else type(o) for o in outputs] + self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms") + + batch_size = len(chunk) + batch_results = self._postprocessor.batch_process_detections( + outputs, + batch_size, + conf_threshold=self._settings.inference.conf_threshold + ) + + total_detections = sum(len(r[0]) for r in batch_results) + self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}") + + for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(chunk): + boxes, scores, class_ids = batch_results[idx] + + # 无论是否检测到目标都要调用算法(离岗检测需要"无人"信号) + self._handle_detections( + camera_id, roi, bind, frame, + boxes, scores, class_ids, + scale_info + ) + + except Exception as e: + self._logger.error(f"批量处理 ROI 失败: {e}") def _process_roi_frame(self, camera_id: str, frame: VideoFrame, roi): """收集 ROI 帧数据 - 已集成到 _process_frame 中""" @@ -848,6 +894,9 @@ class EdgeInferenceService: if self._alarm_worker: self._alarm_worker.stop() + if self._screenshot_handler: + self._screenshot_handler.stop() + if self._reporter: self._reporter.close() diff --git a/requirements.txt b/requirements.txt index 0efb946..f081142 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,9 @@ paho-mqtt==1.6.1 # Redis客户端 - Redis 4.6.0,4.x最终稳定版 redis==4.6.0 +# 腾讯云COS SDK - 用于截图上传 +cos-python-sdk-v5>=1.9.30 + # ============================================================ # 工具库 # ============================================================ diff --git a/utils/logger.py b/utils/logger.py index e8faa00..d178292 100644 --- a/utils/logger.py +++ b/utils/logger.py @@ -100,8 +100,6 @@ class StructuredLogger: # TensorRT 推理引擎 "core.tensorrt_engine", "tensorrt", - # 配置同步 - "core.config_sync", # 数据库 "config.database", # 算法管理器(注册、配置订阅)