Files
iot-device-management-service/docs/cloud_alert_reporter.py

206 lines
6.1 KiB
Python
Raw Permalink Normal View History

"""
云端告警平台上报模块
ai_edge 项目的告警上报到云端告警平台
使用方式
1. 将此文件复制到 ai_edge 项目的 core/ 目录
2. main.py 中初始化并使用
"""
import json
import logging
import uuid
import io
from datetime import datetime
from typing import Optional, Dict, Any
from pathlib import Path
import numpy as np
import requests
from PIL import Image
logger = logging.getLogger("cloud_reporter")
class CloudAlertReporter:
"""云端告警平台上报器"""
def __init__(
self,
platform_url: str = "http://localhost:8000",
timeout: int = 10,
retry_times: int = 3,
):
self._platform_url = platform_url.rstrip("/")
self._api_url = f"{self._platform_url}/api/v1/alerts"
self._timeout = timeout
self._retry_times = retry_times
self._enabled = True
self._performance_stats = {
"alerts_reported": 0,
"alerts_failed": 0,
"images_uploaded": 0,
}
self._logger = logging.getLogger("cloud_reporter")
def report_alert(
self,
camera_id: str,
roi_id: str,
alert_type: str,
algorithm: str = "YOLO11",
confidence: float = 1.0,
duration_minutes: Optional[int] = None,
message: Optional[str] = None,
screenshot: Optional[np.ndarray] = None,
) -> bool:
"""
上报告警到云端平台
Args:
camera_id: 摄像头ID
roi_id: ROI区域ID
alert_type: 告警类型 (leave_post, intrusion等)
algorithm: 算法名称
confidence: 置信度 (0-1)
duration_minutes: 离岗时长分钟
message: 告警消息
screenshot: 抓拍图片 (numpy array, BGR格式)
Returns:
是否上报成功
"""
alert_data = {
"camera_id": camera_id,
"roi_id": roi_id,
"alert_type": alert_type,
"algorithm": algorithm,
"confidence": int(confidence * 100),
"duration_minutes": duration_minutes,
"trigger_time": datetime.utcnow().isoformat() + "Z",
"message": message or f"检测到{alert_type}告警",
}
files = None
if screenshot is not None:
try:
image_bytes = self._image_to_bytes(screenshot)
files = {
"snapshot": ("alert.jpg", image_bytes, "image/jpeg"),
"data": (None, json.dumps(alert_data), "application/json"),
}
except Exception as e:
self._logger.error(f"图片转换失败: {e}")
files = {
"data": (None, json.dumps(alert_data), "application/json"),
}
else:
files = {
"data": (None, json.dumps(alert_data), "application/json"),
}
for attempt in range(self._retry_times):
try:
response = requests.post(
self._api_url,
files=files,
timeout=self._timeout,
)
if response.status_code in [200, 201]:
result = response.json()
self._performance_stats["alerts_reported"] += 1
if screenshot is not None:
self._performance_stats["images_uploaded"] += 1
self._logger.info(f"告警上报成功: {result.get('alert_no', 'N/A')}")
return True
else:
self._logger.warning(
f"告警上报失败 (尝试 {attempt + 1}/{self._retry_times}): "
f"HTTP {response.status_code}"
)
except requests.exceptions.ConnectionError as e:
self._logger.warning(
f"连接告警平台失败 (尝试 {attempt + 1}/{self._retry_times}): {e}"
)
except Exception as e:
self._logger.error(f"上报告警异常: {e}")
self._performance_stats["alerts_failed"] += 1
return False
def _image_to_bytes(self, image: np.ndarray, quality: int = 85) -> bytes:
"""
numpy 图片转换为 JPEG 字节
Args:
image: BGR 格式的 numpy 数组
quality: JPEG 质量 (1-100)
Returns:
JPEG 字节数据
"""
if image is None:
return b""
if len(image.shape) == 3 and image.shape[2] == 3:
image_rgb = image[:, :, ::-1]
else:
image_rgb = image
pil_image = Image.fromarray(image_rgb)
buffer = io.BytesIO()
pil_image.save(buffer, format="JPEG", quality=quality)
return buffer.getvalue()
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return dict(self._performance_stats)
def set_enabled(self, enabled: bool):
"""启用/禁用上报"""
self._enabled = enabled
self._logger.info(f"云端上报已{'启用' if enabled else '禁用'}")
_cloud_reporter: Optional[CloudAlertReporter] = None
def get_cloud_reporter() -> CloudAlertReporter:
"""获取全局云端告警上报器实例"""
global _cloud_reporter
if _cloud_reporter is None:
_cloud_reporter = CloudAlertReporter()
return _cloud_reporter
def init_cloud_reporter(
platform_url: str = "http://localhost:8000",
enabled: bool = True,
) -> CloudAlertReporter:
"""初始化云端告警上报器"""
global _cloud_reporter
_cloud_reporter = CloudAlertReporter(platform_url=platform_url)
_cloud_reporter.set_enabled(enabled)
return _cloud_reporter
# 便捷函数
def report_alert_to_cloud(
camera_id: str,
roi_id: str,
alert_type: str,
**kwargs,
) -> bool:
"""便捷的上报告警函数"""
reporter = get_cloud_reporter()
return reporter.report_alert(
camera_id=camera_id,
roi_id=roi_id,
alert_type=alert_type,
**kwargs,
)