Files
iot-device-management-service/app/services/notification_service.py
16337 6861fb6653 feat: 新增核心服务模块
- mqtt_service: MQTT 订阅服务,对接 ai_edge 告警和心跳
- notification_service: WebSocket 实时推送服务
- device_service: 边缘设备心跳管理和离线检测

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 13:56:57 +08:00

146 lines
4.6 KiB
Python

"""
通知服务
支持 WebSocket 实时推送
"""
import asyncio
import json
from datetime import datetime
from typing import Dict, Any, List, Set
from fastapi import WebSocket
from app.utils.logger import logger
class ConnectionManager:
"""WebSocket 连接管理器"""
def __init__(self):
# 活跃连接
self._active_connections: Set[WebSocket] = set()
async def connect(self, websocket: WebSocket):
"""接受新连接"""
await websocket.accept()
self._active_connections.add(websocket)
logger.info(f"WebSocket 客户端已连接, 当前连接数: {len(self._active_connections)}")
def disconnect(self, websocket: WebSocket):
"""断开连接"""
self._active_connections.discard(websocket)
logger.info(f"WebSocket 客户端已断开, 当前连接数: {len(self._active_connections)}")
async def broadcast(self, message: Dict[str, Any]):
"""广播消息到所有客户端"""
if not self._active_connections:
return
message_json = json.dumps(message, ensure_ascii=False, default=str)
disconnected = set()
for connection in self._active_connections:
try:
await connection.send_text(message_json)
except Exception as e:
logger.warning(f"WebSocket 发送失败: {e}")
disconnected.add(connection)
# 移除断开的连接
self._active_connections -= disconnected
async def send_personal(self, websocket: WebSocket, message: Dict[str, Any]):
"""发送消息到指定客户端"""
try:
message_json = json.dumps(message, ensure_ascii=False, default=str)
await websocket.send_text(message_json)
except Exception as e:
logger.warning(f"WebSocket 发送失败: {e}")
@property
def connection_count(self) -> int:
return len(self._active_connections)
class NotificationService:
"""通知服务"""
def __init__(self):
self._manager = ConnectionManager()
self._loop: asyncio.AbstractEventLoop = None
@property
def manager(self) -> ConnectionManager:
return self._manager
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
"""设置事件循环(用于从同步代码调用异步方法)"""
self._loop = loop
async def notify_new_alert(self, alert_data: Dict[str, Any]):
"""通知新告警"""
message = {
"event": "new_alert",
"data": alert_data,
"timestamp": datetime.utcnow().isoformat(),
}
await self._manager.broadcast(message)
logger.debug(f"已广播新告警通知: {alert_data.get('alert_no', 'N/A')}")
async def notify_alert_updated(self, alert_data: Dict[str, Any]):
"""通知告警更新"""
message = {
"event": "alert_updated",
"data": alert_data,
"timestamp": datetime.utcnow().isoformat(),
}
await self._manager.broadcast(message)
async def notify_device_status(self, device_data: Dict[str, Any]):
"""通知设备状态变更"""
message = {
"event": "device_status",
"data": device_data,
"timestamp": datetime.utcnow().isoformat(),
}
await self._manager.broadcast(message)
async def notify_work_order(self, order_data: Dict[str, Any], event_type: str = "new"):
"""通知工单事件"""
message = {
"event": f"work_order_{event_type}",
"data": order_data,
"timestamp": datetime.utcnow().isoformat(),
}
await self._manager.broadcast(message)
def notify_sync(self, event: str, data: Dict[str, Any]):
"""同步方式通知(用于从 MQTT 回调等同步代码调用)"""
if self._loop is None:
logger.warning("事件循环未设置,无法发送通知")
return
message = {
"event": event,
"data": data,
"timestamp": datetime.utcnow().isoformat(),
}
async def _broadcast():
await self._manager.broadcast(message)
try:
asyncio.run_coroutine_threadsafe(_broadcast(), self._loop)
except Exception as e:
logger.error(f"同步通知失败: {e}")
# 全局单例
_notification_service = None
def get_notification_service() -> NotificationService:
"""获取通知服务单例"""
global _notification_service
if _notification_service is None:
_notification_service = NotificationService()
return _notification_service