""" 云端告警平台上报模块 将 ai_edge 项目的告警上报到云端告警平台 使用方式: 1. 将此文件复制到 ai_edge 项目的 core/ 目录 2. 在 main.py 中初始化并使用 """ import json import logging import uuid import io from datetime import datetime from typing import Optional, Dict, Any from pathlib import Path import numpy as np import requests from PIL import Image logger = logging.getLogger("cloud_reporter") class CloudAlertReporter: """云端告警平台上报器""" def __init__( self, platform_url: str = "http://localhost:8000", timeout: int = 10, retry_times: int = 3, ): self._platform_url = platform_url.rstrip("/") self._api_url = f"{self._platform_url}/api/v1/alerts" self._timeout = timeout self._retry_times = retry_times self._enabled = True self._performance_stats = { "alerts_reported": 0, "alerts_failed": 0, "images_uploaded": 0, } self._logger = logging.getLogger("cloud_reporter") def report_alert( self, camera_id: str, roi_id: str, alert_type: str, algorithm: str = "YOLO11", confidence: float = 1.0, duration_minutes: Optional[int] = None, message: Optional[str] = None, screenshot: Optional[np.ndarray] = None, ) -> bool: """ 上报告警到云端平台 Args: camera_id: 摄像头ID roi_id: ROI区域ID alert_type: 告警类型 (leave_post, intrusion等) algorithm: 算法名称 confidence: 置信度 (0-1) duration_minutes: 离岗时长(分钟) message: 告警消息 screenshot: 抓拍图片 (numpy array, BGR格式) Returns: 是否上报成功 """ alert_data = { "camera_id": camera_id, "roi_id": roi_id, "alert_type": alert_type, "algorithm": algorithm, "confidence": int(confidence * 100), "duration_minutes": duration_minutes, "trigger_time": datetime.utcnow().isoformat() + "Z", "message": message or f"检测到{alert_type}告警", } files = None if screenshot is not None: try: image_bytes = self._image_to_bytes(screenshot) files = { "snapshot": ("alert.jpg", image_bytes, "image/jpeg"), "data": (None, json.dumps(alert_data), "application/json"), } except Exception as e: self._logger.error(f"图片转换失败: {e}") files = { "data": (None, json.dumps(alert_data), "application/json"), } else: files = { "data": (None, json.dumps(alert_data), "application/json"), } for attempt in range(self._retry_times): try: response = requests.post( self._api_url, files=files, timeout=self._timeout, ) if response.status_code in [200, 201]: result = response.json() self._performance_stats["alerts_reported"] += 1 if screenshot is not None: self._performance_stats["images_uploaded"] += 1 self._logger.info(f"告警上报成功: {result.get('alert_no', 'N/A')}") return True else: self._logger.warning( f"告警上报失败 (尝试 {attempt + 1}/{self._retry_times}): " f"HTTP {response.status_code}" ) except requests.exceptions.ConnectionError as e: self._logger.warning( f"连接告警平台失败 (尝试 {attempt + 1}/{self._retry_times}): {e}" ) except Exception as e: self._logger.error(f"上报告警异常: {e}") self._performance_stats["alerts_failed"] += 1 return False def _image_to_bytes(self, image: np.ndarray, quality: int = 85) -> bytes: """ 将 numpy 图片转换为 JPEG 字节 Args: image: BGR 格式的 numpy 数组 quality: JPEG 质量 (1-100) Returns: JPEG 字节数据 """ if image is None: return b"" if len(image.shape) == 3 and image.shape[2] == 3: image_rgb = image[:, :, ::-1] else: image_rgb = image pil_image = Image.fromarray(image_rgb) buffer = io.BytesIO() pil_image.save(buffer, format="JPEG", quality=quality) return buffer.getvalue() def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" return dict(self._performance_stats) def set_enabled(self, enabled: bool): """启用/禁用上报""" self._enabled = enabled self._logger.info(f"云端上报已{'启用' if enabled else '禁用'}") _cloud_reporter: Optional[CloudAlertReporter] = None def get_cloud_reporter() -> CloudAlertReporter: """获取全局云端告警上报器实例""" global _cloud_reporter if _cloud_reporter is None: _cloud_reporter = CloudAlertReporter() return _cloud_reporter def init_cloud_reporter( platform_url: str = "http://localhost:8000", enabled: bool = True, ) -> CloudAlertReporter: """初始化云端告警上报器""" global _cloud_reporter _cloud_reporter = CloudAlertReporter(platform_url=platform_url) _cloud_reporter.set_enabled(enabled) return _cloud_reporter # 便捷函数 def report_alert_to_cloud( camera_id: str, roi_id: str, alert_type: str, **kwargs, ) -> bool: """便捷的上报告警函数""" reporter = get_cloud_reporter() return reporter.report_alert( camera_id=camera_id, roi_id=roi_id, alert_type=alert_type, **kwargs, )