feat(aiot): 边缘告警HTTP上报 + 移除配置中转层
- 新增 edge/report 端点接收边缘端HTTP告警上报 - alarm_event_service 新增 create_from_edge_report 幂等创建 - schemas 新增 EdgeAlarmReport 模型 - 移除 config_service/redis_service/yudao_aiot_config 配置中转 - MQTT 服务标记废弃,告警上报改为HTTP+COS - config 新增 COS/Redis 配置项 - requirements 新增 redis 依赖 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
18
.env.example
18
.env.example
@@ -17,13 +17,17 @@ APP_PORT=8000
|
|||||||
DEBUG=true
|
DEBUG=true
|
||||||
DEV_MODE=true
|
DEV_MODE=true
|
||||||
|
|
||||||
# MQTT 配置
|
|
||||||
MQTT_ENABLED=true
|
|
||||||
MQTT_BROKER_HOST=localhost
|
|
||||||
MQTT_BROKER_PORT=1883
|
|
||||||
MQTT_USERNAME=
|
|
||||||
MQTT_PASSWORD=
|
|
||||||
|
|
||||||
# 大模型配置(可选)
|
# 大模型配置(可选)
|
||||||
AI_MODEL_ENDPOINT=http://localhost:8001
|
AI_MODEL_ENDPOINT=http://localhost:8001
|
||||||
AI_MODEL_API_KEY=your_api_key
|
AI_MODEL_API_KEY=your_api_key
|
||||||
|
|
||||||
|
# Redis 配置(配置下发三层权威模型 - 云端层)
|
||||||
|
REDIS_ENABLED=true
|
||||||
|
REDIS_HOST=localhost
|
||||||
|
REDIS_PORT=6379
|
||||||
|
REDIS_PASSWORD=
|
||||||
|
REDIS_DB=0
|
||||||
|
REDIS_MAX_CONNECTIONS=50
|
||||||
|
|
||||||
|
# 边缘设备认证 Token(边缘端上报告警时使用)
|
||||||
|
# EDGE_TOKEN=your_edge_device_token
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ class AIModelConfig:
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class MQTTConfig:
|
class MQTTConfig:
|
||||||
"""MQTT 配置"""
|
"""MQTT 配置 (已废弃 - 告警上报已改为 HTTP + COS)"""
|
||||||
broker_host: str = "localhost"
|
broker_host: str = "localhost"
|
||||||
broker_port: int = 1883
|
broker_port: int = 1883
|
||||||
client_id: str = "alert_platform"
|
client_id: str = "alert_platform"
|
||||||
@@ -54,6 +54,18 @@ class MQTTConfig:
|
|||||||
alert_topic: str = "edge/alert/#"
|
alert_topic: str = "edge/alert/#"
|
||||||
heartbeat_topic: str = "edge/alert/heartbeat/#"
|
heartbeat_topic: str = "edge/alert/heartbeat/#"
|
||||||
qos: int = 1
|
qos: int = 1
|
||||||
|
enabled: bool = False # 默认禁用
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RedisConfig:
|
||||||
|
"""Redis 配置"""
|
||||||
|
host: str = "localhost"
|
||||||
|
port: int = 6379
|
||||||
|
password: str = ""
|
||||||
|
db: int = 0
|
||||||
|
max_connections: int = 50
|
||||||
|
decode_responses: bool = True
|
||||||
enabled: bool = True
|
enabled: bool = True
|
||||||
|
|
||||||
|
|
||||||
@@ -64,6 +76,7 @@ class Settings(BaseModel):
|
|||||||
app: AppConfig = AppConfig()
|
app: AppConfig = AppConfig()
|
||||||
ai_model: AIModelConfig = AIModelConfig()
|
ai_model: AIModelConfig = AIModelConfig()
|
||||||
mqtt: MQTTConfig = MQTTConfig()
|
mqtt: MQTTConfig = MQTTConfig()
|
||||||
|
redis: RedisConfig = RedisConfig()
|
||||||
|
|
||||||
|
|
||||||
def load_settings() -> Settings:
|
def load_settings() -> Settings:
|
||||||
@@ -104,7 +117,15 @@ def load_settings() -> Settings:
|
|||||||
alert_topic=os.getenv("MQTT_ALERT_TOPIC", "edge/alert/#"),
|
alert_topic=os.getenv("MQTT_ALERT_TOPIC", "edge/alert/#"),
|
||||||
heartbeat_topic=os.getenv("MQTT_HEARTBEAT_TOPIC", "edge/alert/heartbeat/#"),
|
heartbeat_topic=os.getenv("MQTT_HEARTBEAT_TOPIC", "edge/alert/heartbeat/#"),
|
||||||
qos=int(os.getenv("MQTT_QOS", "1")),
|
qos=int(os.getenv("MQTT_QOS", "1")),
|
||||||
enabled=os.getenv("MQTT_ENABLED", "true").lower() == "true",
|
enabled=os.getenv("MQTT_ENABLED", "false").lower() == "true",
|
||||||
|
),
|
||||||
|
redis=RedisConfig(
|
||||||
|
host=os.getenv("REDIS_HOST", "localhost"),
|
||||||
|
port=int(os.getenv("REDIS_PORT", "6379")),
|
||||||
|
password=os.getenv("REDIS_PASSWORD", ""),
|
||||||
|
db=int(os.getenv("REDIS_DB", "0")),
|
||||||
|
max_connections=int(os.getenv("REDIS_MAX_CONNECTIONS", "50")),
|
||||||
|
enabled=os.getenv("REDIS_ENABLED", "true").lower() == "true",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
59
app/main.py
59
app/main.py
@@ -23,7 +23,6 @@ from app.schemas import (
|
|||||||
from app.services.alert_service import alert_service, get_alert_service
|
from app.services.alert_service import alert_service, get_alert_service
|
||||||
from app.services.alarm_event_service import alarm_event_service, get_alarm_event_service
|
from app.services.alarm_event_service import alarm_event_service, get_alarm_event_service
|
||||||
from app.services.ai_analyzer import trigger_async_analysis
|
from app.services.ai_analyzer import trigger_async_analysis
|
||||||
from app.services.mqtt_service import get_mqtt_service
|
|
||||||
from app.services.notification_service import get_notification_service
|
from app.services.notification_service import get_notification_service
|
||||||
from app.services.device_service import get_device_service
|
from app.services.device_service import get_device_service
|
||||||
from app.utils.logger import logger
|
from app.utils.logger import logger
|
||||||
@@ -33,44 +32,10 @@ import json
|
|||||||
|
|
||||||
|
|
||||||
# 全局服务实例
|
# 全局服务实例
|
||||||
mqtt_service = get_mqtt_service()
|
|
||||||
notification_service = get_notification_service()
|
notification_service = get_notification_service()
|
||||||
device_service = get_device_service()
|
device_service = get_device_service()
|
||||||
|
|
||||||
|
|
||||||
def handle_mqtt_alert(payload: dict):
|
|
||||||
"""处理 MQTT 告警消息(双写:旧表 + 新表)"""
|
|
||||||
try:
|
|
||||||
# 1. 写旧表(保持兼容)
|
|
||||||
alert = alert_service.create_alert_from_mqtt(payload)
|
|
||||||
if alert:
|
|
||||||
logger.info(f"MQTT 告警写入旧表: {alert.alert_no}")
|
|
||||||
|
|
||||||
# 2. 写新表
|
|
||||||
alarm = alarm_event_service.create_from_mqtt(payload)
|
|
||||||
|
|
||||||
# 3. WebSocket 通知(优先使用新表数据)
|
|
||||||
if alarm:
|
|
||||||
notification_service.notify_sync("new_alert", alarm.to_dict())
|
|
||||||
logger.info(f"MQTT 告警已处理并推送: {alarm.alarm_id}")
|
|
||||||
elif alert:
|
|
||||||
notification_service.notify_sync("new_alert", alert.to_dict())
|
|
||||||
logger.info(f"MQTT 告警已处理并推送(旧表): {alert.alert_no}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"处理 MQTT 告警失败: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def handle_mqtt_heartbeat(payload: dict):
|
|
||||||
"""处理 MQTT 心跳消息"""
|
|
||||||
try:
|
|
||||||
device = device_service.handle_heartbeat(payload)
|
|
||||||
if device:
|
|
||||||
# 通过 WebSocket 推送设备状态
|
|
||||||
notification_service.notify_sync("device_status", device.to_dict())
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"处理 MQTT 心跳失败: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
"""应用生命周期管理"""
|
"""应用生命周期管理"""
|
||||||
@@ -78,23 +43,15 @@ async def lifespan(app: FastAPI):
|
|||||||
init_db()
|
init_db()
|
||||||
logger.info("数据库初始化完成")
|
logger.info("数据库初始化完成")
|
||||||
|
|
||||||
# 设置事件循环(用于从 MQTT 回调调用异步方法)
|
# 设置事件循环(用于从同步代码调用异步方法,如 WebSocket 通知)
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
notification_service.set_event_loop(loop)
|
notification_service.set_event_loop(loop)
|
||||||
|
|
||||||
# 注册 MQTT 处理器
|
|
||||||
mqtt_service.register_alert_handler(handle_mqtt_alert)
|
|
||||||
mqtt_service.register_heartbeat_handler(handle_mqtt_heartbeat)
|
|
||||||
|
|
||||||
# 启动 MQTT 服务
|
|
||||||
mqtt_service.start()
|
|
||||||
|
|
||||||
logger.info("AI 告警平台启动完成")
|
logger.info("AI 告警平台启动完成")
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# 关闭
|
# 关闭
|
||||||
mqtt_service.stop()
|
|
||||||
logger.info("AI 告警平台已关闭")
|
logger.info("AI 告警平台已关闭")
|
||||||
|
|
||||||
|
|
||||||
@@ -149,13 +106,9 @@ async def health_check():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
db_status = f"unhealthy: {e}"
|
db_status = f"unhealthy: {e}"
|
||||||
|
|
||||||
mqtt_stats = mqtt_service.get_statistics()
|
|
||||||
mqtt_status = "connected" if mqtt_stats["connected"] else "disconnected"
|
|
||||||
|
|
||||||
return HealthResponse(
|
return HealthResponse(
|
||||||
status="healthy" if db_status == "healthy" and mqtt_stats["connected"] else "degraded",
|
status="healthy" if db_status == "healthy" else "degraded",
|
||||||
database=db_status,
|
database=db_status,
|
||||||
mqtt=mqtt_status,
|
|
||||||
websocket_connections=notification_service.manager.connection_count,
|
websocket_connections=notification_service.manager.connection_count,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -333,14 +286,6 @@ async def get_device(
|
|||||||
return DeviceResponse(**device.to_dict())
|
return DeviceResponse(**device.to_dict())
|
||||||
|
|
||||||
|
|
||||||
# ==================== MQTT 状态端点 ====================
|
|
||||||
|
|
||||||
@app.get("/api/v1/mqtt/statistics")
|
|
||||||
async def get_mqtt_statistics():
|
|
||||||
"""获取 MQTT 服务统计"""
|
|
||||||
return mqtt_service.get_statistics()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import uvicorn
|
import uvicorn
|
||||||
uvicorn.run(
|
uvicorn.run(
|
||||||
|
|||||||
@@ -18,7 +18,9 @@ from datetime import datetime
|
|||||||
|
|
||||||
from app.yudao_compat import YudaoResponse, get_current_user
|
from app.yudao_compat import YudaoResponse, get_current_user
|
||||||
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
|
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
|
||||||
|
from app.services.notification_service import get_notification_service
|
||||||
from app.services.oss_storage import get_oss_storage
|
from app.services.oss_storage import get_oss_storage
|
||||||
|
from app.schemas import EdgeAlarmReport
|
||||||
|
|
||||||
router = APIRouter(prefix="/admin-api/aiot/alarm", tags=["AIoT-告警"])
|
router = APIRouter(prefix="/admin-api/aiot/alarm", tags=["AIoT-告警"])
|
||||||
|
|
||||||
@@ -184,6 +186,38 @@ async def get_device_summary_page(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== 边缘端告警上报 ====================
|
||||||
|
|
||||||
|
@router.post("/edge/report")
|
||||||
|
async def edge_alarm_report(
|
||||||
|
report: EdgeAlarmReport,
|
||||||
|
service: AlarmEventService = Depends(get_alarm_event_service),
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
边缘端告警上报接口
|
||||||
|
|
||||||
|
边缘设备通过 HTTP POST 上报告警元数据,截图已预先上传到 COS。
|
||||||
|
支持幂等:通过 alarm_id 判断是否已存在。
|
||||||
|
"""
|
||||||
|
alarm = service.create_from_edge_report(report.model_dump())
|
||||||
|
|
||||||
|
if alarm is None:
|
||||||
|
return YudaoResponse.error(500, "告警创建失败")
|
||||||
|
|
||||||
|
# WebSocket 通知
|
||||||
|
try:
|
||||||
|
notification_svc = get_notification_service()
|
||||||
|
notification_svc.notify_sync("new_alert", alarm.to_dict())
|
||||||
|
except Exception:
|
||||||
|
pass # WebSocket 通知失败不影响主流程
|
||||||
|
|
||||||
|
return YudaoResponse.success({
|
||||||
|
"alarmId": alarm.alarm_id,
|
||||||
|
"created": True,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
# ==================== 辅助函数 ====================
|
# ==================== 辅助函数 ====================
|
||||||
|
|
||||||
def _get_alarm_type_name(alarm_type: Optional[str]) -> str:
|
def _get_alarm_type_name(alarm_type: Optional[str]) -> str:
|
||||||
|
|||||||
@@ -110,3 +110,19 @@ class HealthResponse(BaseModel):
|
|||||||
database: str
|
database: str
|
||||||
mqtt: Optional[str] = None
|
mqtt: Optional[str] = None
|
||||||
websocket_connections: Optional[int] = None
|
websocket_connections: Optional[int] = None
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== 边缘告警上报 ====================
|
||||||
|
|
||||||
|
class EdgeAlarmReport(BaseModel):
|
||||||
|
"""边缘端告警上报请求体"""
|
||||||
|
alarm_id: str = Field(..., max_length=64, description="边缘端生成的告警ID (edge_{device_id}_{timestamp}_{uuid})")
|
||||||
|
alarm_type: str = Field(..., max_length=32, description="告警类型: leave_post/intrusion/crowd 等")
|
||||||
|
device_id: str = Field(..., max_length=64, description="摄像头/设备ID")
|
||||||
|
scene_id: Optional[str] = Field(None, max_length=64, description="场景/ROI ID")
|
||||||
|
event_time: str = Field(..., description="事件发生时间 ISO8601")
|
||||||
|
alarm_level: int = Field(2, ge=1, le=4, description="告警级别: 1提醒 2一般 3严重 4紧急")
|
||||||
|
snapshot_url: Optional[str] = Field(None, max_length=512, description="截图 COS object_key")
|
||||||
|
algorithm_code: Optional[str] = Field(None, max_length=64, description="算法编码")
|
||||||
|
confidence_score: Optional[float] = Field(None, ge=0, le=1, description="置信度 0-1")
|
||||||
|
ext_data: Optional[Dict[str, Any]] = Field(None, description="扩展数据 (bbox/target_class 等)")
|
||||||
|
|||||||
@@ -126,6 +126,97 @@ class AlarmEventService:
|
|||||||
finally:
|
finally:
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
|
def create_from_edge_report(self, data: Dict[str, Any]) -> Optional[AlarmEvent]:
|
||||||
|
"""
|
||||||
|
从边缘端 HTTP 上报创建告警事件
|
||||||
|
|
||||||
|
边缘端通过 POST /admin-api/aiot/alarm/edge/report 上报告警。
|
||||||
|
使用边缘端生成的 alarm_id,支持幂等(重复 alarm_id 跳过)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: 边缘端上报数据,字段与 alarm_event 表对齐
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
AlarmEvent 或 None
|
||||||
|
"""
|
||||||
|
db = get_session()
|
||||||
|
try:
|
||||||
|
alarm_id = data.get("alarm_id")
|
||||||
|
if not alarm_id:
|
||||||
|
logger.error("边缘上报缺少 alarm_id")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 幂等校验:alarm_id 已存在则跳过
|
||||||
|
existing = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
|
||||||
|
if existing:
|
||||||
|
logger.info(f"告警已存在,跳过: {alarm_id}")
|
||||||
|
return existing
|
||||||
|
|
||||||
|
# 解析时间
|
||||||
|
event_time_str = data.get("event_time")
|
||||||
|
if event_time_str:
|
||||||
|
try:
|
||||||
|
event_time = datetime.fromisoformat(event_time_str.replace("Z", "+00:00"))
|
||||||
|
except ValueError:
|
||||||
|
event_time = datetime.now(timezone.utc)
|
||||||
|
else:
|
||||||
|
event_time = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
# 置信度
|
||||||
|
confidence = data.get("confidence_score")
|
||||||
|
if confidence is not None:
|
||||||
|
confidence = float(confidence)
|
||||||
|
if confidence > 1:
|
||||||
|
confidence = confidence / 100.0
|
||||||
|
|
||||||
|
alarm_type = data.get("alarm_type", "unknown")
|
||||||
|
alarm_level = data.get("alarm_level")
|
||||||
|
if alarm_level is None:
|
||||||
|
# 从 ext_data 取 duration_ms
|
||||||
|
ext_data = data.get("ext_data") or {}
|
||||||
|
duration_ms = ext_data.get("duration_ms")
|
||||||
|
alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms)
|
||||||
|
|
||||||
|
alarm = AlarmEvent(
|
||||||
|
alarm_id=alarm_id,
|
||||||
|
alarm_type=alarm_type,
|
||||||
|
algorithm_code=data.get("algorithm_code"),
|
||||||
|
device_id=data.get("device_id", "unknown"),
|
||||||
|
scene_id=data.get("scene_id"),
|
||||||
|
event_time=event_time,
|
||||||
|
alarm_level=alarm_level,
|
||||||
|
confidence_score=confidence,
|
||||||
|
alarm_status="NEW",
|
||||||
|
handle_status="UNHANDLED",
|
||||||
|
snapshot_url=data.get("snapshot_url"), # COS object_key
|
||||||
|
edge_node_id=data.get("ext_data", {}).get("edge_node_id") if data.get("ext_data") else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
db.add(alarm)
|
||||||
|
|
||||||
|
# 写入扩展表
|
||||||
|
ext_data = data.get("ext_data")
|
||||||
|
if ext_data:
|
||||||
|
ext = AlarmEventExt(
|
||||||
|
alarm_id=alarm_id,
|
||||||
|
ext_type="EDGE_HTTP",
|
||||||
|
ext_data=ext_data,
|
||||||
|
)
|
||||||
|
db.add(ext)
|
||||||
|
|
||||||
|
db.commit()
|
||||||
|
db.refresh(alarm)
|
||||||
|
|
||||||
|
logger.info(f"边缘端告警创建成功: {alarm_id}, type={alarm_type}, device={data.get('device_id')}")
|
||||||
|
return alarm
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
db.rollback()
|
||||||
|
logger.error(f"创建边缘端告警失败: {e}")
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
def create_from_http(self, data: Dict[str, Any], snapshot_data: Optional[bytes] = None) -> Optional[AlarmEvent]:
|
def create_from_http(self, data: Dict[str, Any], snapshot_data: Optional[bytes] = None) -> Optional[AlarmEvent]:
|
||||||
"""从 HTTP 请求创建告警事件"""
|
"""从 HTTP 请求创建告警事件"""
|
||||||
db = get_session()
|
db = get_session()
|
||||||
|
|||||||
@@ -1,265 +1,53 @@
|
|||||||
"""
|
"""
|
||||||
MQTT 订阅服务
|
MQTT 服务 - 已废弃
|
||||||
订阅边缘端告警和心跳消息
|
|
||||||
|
告警上报已改为 HTTP + COS 方案(边缘端直传)。
|
||||||
|
此文件保留为空壳,避免其他模块 import 报错。
|
||||||
|
后续版本将彻底删除此文件。
|
||||||
"""
|
"""
|
||||||
import json
|
|
||||||
import threading
|
|
||||||
import uuid
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
from typing import Callable, Dict, Any, Optional, List
|
|
||||||
|
|
||||||
import paho.mqtt.client as mqtt
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
from app.config import settings
|
|
||||||
from app.utils.logger import logger
|
|
||||||
|
|
||||||
|
|
||||||
class MQTTService:
|
class MQTTService:
|
||||||
"""MQTT 订阅服务"""
|
"""MQTT 服务 (已废弃,保留空壳兼容旧代码)"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._client: Optional[mqtt.Client] = None
|
pass
|
||||||
self._connected = False
|
|
||||||
self._running = False
|
|
||||||
self._use_v2_callback = False # paho-mqtt 版本标记
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
|
|
||||||
# 回调函数
|
@property
|
||||||
self._alert_handlers: List[Callable[[Dict[str, Any]], None]] = []
|
def is_connected(self) -> bool:
|
||||||
self._heartbeat_handlers: List[Callable[[Dict[str, Any]], None]] = []
|
return False
|
||||||
|
|
||||||
# 统计
|
def register_alert_handler(self, handler):
|
||||||
self._stats = {
|
pass
|
||||||
|
|
||||||
|
def register_heartbeat_handler(self, handler):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_statistics(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
"messages_received": 0,
|
"messages_received": 0,
|
||||||
"alerts_received": 0,
|
"alerts_received": 0,
|
||||||
"heartbeats_received": 0,
|
"heartbeats_received": 0,
|
||||||
"errors": 0,
|
"errors": 0,
|
||||||
}
|
"connected": False,
|
||||||
|
"running": False,
|
||||||
@property
|
"deprecated": True,
|
||||||
def is_connected(self) -> bool:
|
|
||||||
return self._connected
|
|
||||||
|
|
||||||
def register_alert_handler(self, handler: Callable[[Dict[str, Any]], None]):
|
|
||||||
"""注册告警处理回调"""
|
|
||||||
self._alert_handlers.append(handler)
|
|
||||||
logger.info(f"已注册告警处理器: {handler.__name__}")
|
|
||||||
|
|
||||||
def register_heartbeat_handler(self, handler: Callable[[Dict[str, Any]], None]):
|
|
||||||
"""注册心跳处理回调"""
|
|
||||||
self._heartbeat_handlers.append(handler)
|
|
||||||
logger.info(f"已注册心跳处理器: {handler.__name__}")
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""启动 MQTT 服务"""
|
|
||||||
if not settings.mqtt.enabled:
|
|
||||||
logger.info("MQTT 服务已禁用")
|
|
||||||
return
|
|
||||||
|
|
||||||
if self._running:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
# 给 client_id 添加随机后缀,防止多实例 client_id 冲突导致反复踢连接
|
|
||||||
unique_client_id = f"{settings.mqtt.client_id}_{uuid.uuid4().hex[:8]}"
|
|
||||||
|
|
||||||
# 兼容 paho-mqtt 1.x 和 2.x 版本
|
|
||||||
try:
|
|
||||||
# paho-mqtt 2.0+ 新 API
|
|
||||||
self._client = mqtt.Client(
|
|
||||||
client_id=unique_client_id,
|
|
||||||
protocol=mqtt.MQTTv311,
|
|
||||||
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
|
|
||||||
)
|
|
||||||
self._use_v2_callback = True
|
|
||||||
except AttributeError:
|
|
||||||
# paho-mqtt 1.x 旧 API
|
|
||||||
self._client = mqtt.Client(
|
|
||||||
client_id=unique_client_id,
|
|
||||||
protocol=mqtt.MQTTv311
|
|
||||||
)
|
|
||||||
self._use_v2_callback = False
|
|
||||||
logger.info("使用 paho-mqtt 1.x 兼容模式")
|
|
||||||
|
|
||||||
# 设置回调
|
|
||||||
self._client.on_connect = self._on_connect
|
|
||||||
self._client.on_disconnect = self._on_disconnect
|
|
||||||
self._client.on_message = self._on_message
|
|
||||||
|
|
||||||
# 设置认证(如果有)
|
|
||||||
if settings.mqtt.username:
|
|
||||||
self._client.username_pw_set(
|
|
||||||
settings.mqtt.username,
|
|
||||||
settings.mqtt.password
|
|
||||||
)
|
|
||||||
|
|
||||||
# 连接
|
|
||||||
self._client.connect(
|
|
||||||
settings.mqtt.broker_host,
|
|
||||||
settings.mqtt.broker_port,
|
|
||||||
60
|
|
||||||
)
|
|
||||||
|
|
||||||
# 启动循环
|
|
||||||
self._client.loop_start()
|
|
||||||
self._running = True
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"MQTT 服务启动: {settings.mqtt.broker_host}:{settings.mqtt.broker_port}"
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"MQTT 服务启动失败: {e}")
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""停止 MQTT 服务"""
|
|
||||||
if not self._running:
|
|
||||||
return
|
|
||||||
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
if self._client:
|
|
||||||
self._client.loop_stop()
|
|
||||||
self._client.disconnect()
|
|
||||||
logger.info("MQTT 服务已停止")
|
|
||||||
|
|
||||||
def _on_connect(self, client, userdata, *args):
|
|
||||||
"""连接回调 (兼容 1.x 和 2.x)"""
|
|
||||||
# 1.x: (client, userdata, flags, rc)
|
|
||||||
# 2.x: (client, userdata, connect_flags, reason_code, properties)
|
|
||||||
rc = 0
|
|
||||||
if args:
|
|
||||||
# 取倒数第二个参数(1.x 的 rc 或 2.x 的 reason_code)
|
|
||||||
if len(args) >= 2:
|
|
||||||
reason_code = args[-2]
|
|
||||||
else:
|
|
||||||
reason_code = args[0]
|
|
||||||
rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0)
|
|
||||||
|
|
||||||
if rc == 0:
|
|
||||||
self._connected = True
|
|
||||||
logger.info("MQTT 连接成功")
|
|
||||||
|
|
||||||
# 订阅告警主题
|
|
||||||
client.subscribe(settings.mqtt.alert_topic, settings.mqtt.qos)
|
|
||||||
logger.info(f"已订阅告警主题: {settings.mqtt.alert_topic}")
|
|
||||||
|
|
||||||
# 心跳主题通常是 edge/alert/heartbeat/# 但它已经被 edge/alert/# 包含
|
|
||||||
# 所以不需要单独订阅
|
|
||||||
else:
|
|
||||||
self._connected = False
|
|
||||||
logger.error(f"MQTT 连接失败: {reason_code}")
|
|
||||||
|
|
||||||
def _on_disconnect(self, client, userdata, *args):
|
|
||||||
"""断开连接回调 (兼容 1.x 和 2.x)"""
|
|
||||||
# 1.x: (client, userdata, rc)
|
|
||||||
# 2.x: (client, userdata, disconnect_flags, reason_code, properties)
|
|
||||||
self._connected = False
|
|
||||||
rc = 0
|
|
||||||
if args:
|
|
||||||
if len(args) >= 2:
|
|
||||||
reason_code = args[-2]
|
|
||||||
else:
|
|
||||||
reason_code = args[0]
|
|
||||||
rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0)
|
|
||||||
if rc != 0:
|
|
||||||
logger.warning(f"MQTT 连接异常断开: rc={rc}")
|
|
||||||
else:
|
|
||||||
logger.info("MQTT 连接断开")
|
|
||||||
|
|
||||||
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
|
|
||||||
"""消息回调"""
|
|
||||||
with self._lock:
|
|
||||||
self._stats["messages_received"] += 1
|
|
||||||
|
|
||||||
try:
|
|
||||||
topic = msg.topic
|
|
||||||
payload = json.loads(msg.payload.decode("utf-8"))
|
|
||||||
|
|
||||||
logger.debug(f"收到 MQTT 消息: topic={topic}")
|
|
||||||
|
|
||||||
# 判断消息类型
|
|
||||||
if "/heartbeat/" in topic:
|
|
||||||
self._handle_heartbeat(topic, payload)
|
|
||||||
else:
|
|
||||||
self._handle_alert(topic, payload)
|
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
logger.error(f"MQTT 消息解析失败: {e}")
|
|
||||||
with self._lock:
|
|
||||||
self._stats["errors"] += 1
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"MQTT 消息处理失败: {e}")
|
|
||||||
with self._lock:
|
|
||||||
self._stats["errors"] += 1
|
|
||||||
|
|
||||||
def _handle_alert(self, topic: str, payload: Dict[str, Any]):
|
|
||||||
"""处理告警消息"""
|
|
||||||
with self._lock:
|
|
||||||
self._stats["alerts_received"] += 1
|
|
||||||
|
|
||||||
# 从 topic 解析 camera_id 和 roi_id
|
|
||||||
# topic 格式: edge/alert/{camera_id}/{roi_id}
|
|
||||||
parts = topic.split("/")
|
|
||||||
if len(parts) >= 4:
|
|
||||||
camera_id = parts[2]
|
|
||||||
roi_id = parts[3] if len(parts) > 3 else None
|
|
||||||
|
|
||||||
# 确保 payload 中有这些字段
|
|
||||||
payload.setdefault("camera_id", camera_id)
|
|
||||||
payload.setdefault("roi_id", roi_id)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"收到告警: camera={payload.get('camera_id')}, "
|
|
||||||
f"type={payload.get('alert_type')}, "
|
|
||||||
f"confidence={payload.get('confidence')}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# 调用所有注册的处理器
|
|
||||||
for handler in self._alert_handlers:
|
|
||||||
try:
|
|
||||||
handler(payload)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"告警处理器执行失败: {e}")
|
|
||||||
|
|
||||||
def _handle_heartbeat(self, topic: str, payload: Dict[str, Any]):
|
|
||||||
"""处理心跳消息"""
|
|
||||||
with self._lock:
|
|
||||||
self._stats["heartbeats_received"] += 1
|
|
||||||
|
|
||||||
# 从 topic 解析 device_id
|
|
||||||
# topic 格式: edge/alert/heartbeat/{device_id}
|
|
||||||
parts = topic.split("/")
|
|
||||||
if len(parts) >= 4:
|
|
||||||
device_id = parts[3]
|
|
||||||
payload.setdefault("device_id", device_id)
|
|
||||||
|
|
||||||
logger.debug(f"收到心跳: device={payload.get('device_id')}")
|
|
||||||
|
|
||||||
# 调用所有注册的处理器
|
|
||||||
for handler in self._heartbeat_handlers:
|
|
||||||
try:
|
|
||||||
handler(payload)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"心跳处理器执行失败: {e}")
|
|
||||||
|
|
||||||
def get_statistics(self) -> Dict[str, Any]:
|
|
||||||
"""获取统计信息"""
|
|
||||||
with self._lock:
|
|
||||||
return {
|
|
||||||
**self._stats,
|
|
||||||
"connected": self._connected,
|
|
||||||
"running": self._running,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# 全局单例
|
|
||||||
_mqtt_service: Optional[MQTTService] = None
|
_mqtt_service: Optional[MQTTService] = None
|
||||||
|
|
||||||
|
|
||||||
def get_mqtt_service() -> MQTTService:
|
def get_mqtt_service() -> MQTTService:
|
||||||
"""获取 MQTT 服务单例"""
|
"""获取 MQTT 服务单例 (已废弃)"""
|
||||||
global _mqtt_service
|
global _mqtt_service
|
||||||
if _mqtt_service is None:
|
if _mqtt_service is None:
|
||||||
_mqtt_service = MQTTService()
|
_mqtt_service = MQTTService()
|
||||||
|
|||||||
@@ -11,3 +11,4 @@ httpx==0.26.0
|
|||||||
paho-mqtt==2.1.0
|
paho-mqtt==2.1.0
|
||||||
python-dotenv==1.0.1
|
python-dotenv==1.0.1
|
||||||
websockets==12.0
|
websockets==12.0
|
||||||
|
redis>=5.0.0
|
||||||
|
|||||||
Reference in New Issue
Block a user