From 6861fb665328acfd45ce70bc285aa4e36f562bd1 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 5 Feb 2026 13:56:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E6=A0=B8=E5=BF=83?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - mqtt_service: MQTT 订阅服务,对接 ai_edge 告警和心跳 - notification_service: WebSocket 实时推送服务 - device_service: 边缘设备心跳管理和离线检测 Co-Authored-By: Claude Opus 4.5 --- app/services/device_service.py | 183 ++++++++++++++++++++++ app/services/mqtt_service.py | 226 +++++++++++++++++++++++++++ app/services/notification_service.py | 145 +++++++++++++++++ 3 files changed, 554 insertions(+) create mode 100644 app/services/device_service.py create mode 100644 app/services/mqtt_service.py create mode 100644 app/services/notification_service.py diff --git a/app/services/device_service.py b/app/services/device_service.py new file mode 100644 index 0000000..da802a4 --- /dev/null +++ b/app/services/device_service.py @@ -0,0 +1,183 @@ +""" +设备服务 +管理边缘设备状态 +""" +from datetime import datetime, timezone, timedelta +from typing import Dict, Any, List, Optional + +from app.models import EdgeDevice, DeviceStatus, get_session +from app.utils.logger import logger + + +class DeviceService: + """设备服务""" + + # 设备离线超时时间(秒) + OFFLINE_TIMEOUT = 90 + + def __init__(self): + self._devices: Dict[str, Dict[str, Any]] = {} # 内存缓存 + + def handle_heartbeat(self, heartbeat_data: Dict[str, Any]) -> Optional[EdgeDevice]: + """处理心跳消息""" + device_id = heartbeat_data.get("device_id") + if not device_id: + logger.warning("心跳消息缺少 device_id") + return None + + status_info = heartbeat_data.get("status", {}) + + db = get_session() + try: + # 查找或创建设备 + device = db.query(EdgeDevice).filter( + EdgeDevice.device_id == device_id + ).first() + + now = datetime.now(timezone.utc) + + if device is None: + # 新设备 + device = EdgeDevice( + device_id=device_id, + device_name=f"边缘设备_{device_id[:8]}", + status=DeviceStatus.ONLINE, + last_heartbeat=now, + ) + db.add(device) + logger.info(f"发现新设备: {device_id}") + else: + # 更新状态 + old_status = device.status + device.status = DeviceStatus.ONLINE + device.last_heartbeat = now + + if old_status != DeviceStatus.ONLINE: + logger.info(f"设备上线: {device_id}") + + # 更新运行信息 + if status_info: + device.uptime_seconds = status_info.get("uptime_seconds") + device.frames_processed = status_info.get("frames_processed") + device.alerts_generated = status_info.get("alerts_generated") + device.extra_info = status_info.get("stream_stats") + + device.updated_at = now + + db.commit() + db.refresh(device) + + # 更新内存缓存 + self._devices[device_id] = device.to_dict() + + return device + + except Exception as e: + db.rollback() + logger.error(f"处理心跳失败: {e}") + return None + finally: + db.close() + + def check_offline_devices(self) -> List[EdgeDevice]: + """检查离线设备""" + db = get_session() + try: + threshold = datetime.now(timezone.utc) - timedelta(seconds=self.OFFLINE_TIMEOUT) + + # 查找需要标记为离线的设备 + devices = db.query(EdgeDevice).filter( + EdgeDevice.status == DeviceStatus.ONLINE, + EdgeDevice.last_heartbeat < threshold + ).all() + + offline_devices = [] + for device in devices: + device.status = DeviceStatus.OFFLINE + device.updated_at = datetime.now(timezone.utc) + offline_devices.append(device) + logger.info(f"设备离线: {device.device_id}") + + if offline_devices: + db.commit() + + return offline_devices + + except Exception as e: + db.rollback() + logger.error(f"检查离线设备失败: {e}") + return [] + finally: + db.close() + + def get_device(self, device_id: str) -> Optional[EdgeDevice]: + """获取设备信息""" + db = get_session() + try: + return db.query(EdgeDevice).filter( + EdgeDevice.device_id == device_id + ).first() + finally: + db.close() + + def get_devices( + self, + status: Optional[str] = None, + page: int = 1, + page_size: int = 20 + ) -> tuple[List[EdgeDevice], int]: + """获取设备列表""" + db = get_session() + try: + query = db.query(EdgeDevice) + + if status: + query = query.filter(EdgeDevice.status == status) + + total = query.count() + devices = ( + query.order_by(EdgeDevice.last_heartbeat.desc()) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + return devices, total + finally: + db.close() + + def get_statistics(self) -> Dict[str, Any]: + """获取设备统计""" + db = get_session() + try: + total = db.query(EdgeDevice).count() + online = db.query(EdgeDevice).filter( + EdgeDevice.status == DeviceStatus.ONLINE + ).count() + offline = db.query(EdgeDevice).filter( + EdgeDevice.status == DeviceStatus.OFFLINE + ).count() + error = db.query(EdgeDevice).filter( + EdgeDevice.status == DeviceStatus.ERROR + ).count() + + return { + "total": total, + "online": online, + "offline": offline, + "error": error, + } + finally: + db.close() + + +# 全局单例 +_device_service = None + + +def get_device_service() -> DeviceService: + """获取设备服务单例""" + global _device_service + if _device_service is None: + _device_service = DeviceService() + return _device_service diff --git a/app/services/mqtt_service.py b/app/services/mqtt_service.py new file mode 100644 index 0000000..e19510a --- /dev/null +++ b/app/services/mqtt_service.py @@ -0,0 +1,226 @@ +""" +MQTT 订阅服务 +订阅边缘端告警和心跳消息 +""" +import json +import threading +from datetime import datetime, timezone +from typing import Callable, Dict, Any, Optional, List + +import paho.mqtt.client as mqtt + +from app.config import settings +from app.utils.logger import logger + + +class MQTTService: + """MQTT 订阅服务""" + + def __init__(self): + self._client: Optional[mqtt.Client] = None + self._connected = False + self._running = False + self._lock = threading.Lock() + + # 回调函数 + self._alert_handlers: List[Callable[[Dict[str, Any]], None]] = [] + self._heartbeat_handlers: List[Callable[[Dict[str, Any]], None]] = [] + + # 统计 + self._stats = { + "messages_received": 0, + "alerts_received": 0, + "heartbeats_received": 0, + "errors": 0, + } + + @property + def is_connected(self) -> bool: + return self._connected + + def register_alert_handler(self, handler: Callable[[Dict[str, Any]], None]): + """注册告警处理回调""" + self._alert_handlers.append(handler) + logger.info(f"已注册告警处理器: {handler.__name__}") + + def register_heartbeat_handler(self, handler: Callable[[Dict[str, Any]], None]): + """注册心跳处理回调""" + self._heartbeat_handlers.append(handler) + logger.info(f"已注册心跳处理器: {handler.__name__}") + + def start(self): + """启动 MQTT 服务""" + if not settings.mqtt.enabled: + logger.info("MQTT 服务已禁用") + return + + if self._running: + return + + try: + self._client = mqtt.Client( + client_id=settings.mqtt.client_id, + protocol=mqtt.MQTTv5, + callback_api_version=mqtt.CallbackAPIVersion.VERSION2 + ) + + # 设置回调 + self._client.on_connect = self._on_connect + self._client.on_disconnect = self._on_disconnect + self._client.on_message = self._on_message + + # 设置认证(如果有) + if settings.mqtt.username: + self._client.username_pw_set( + settings.mqtt.username, + settings.mqtt.password + ) + + # 连接 + self._client.connect( + settings.mqtt.broker_host, + settings.mqtt.broker_port, + 60 + ) + + # 启动循环 + self._client.loop_start() + self._running = True + + logger.info( + f"MQTT 服务启动: {settings.mqtt.broker_host}:{settings.mqtt.broker_port}" + ) + + except Exception as e: + logger.error(f"MQTT 服务启动失败: {e}") + self._running = False + + def stop(self): + """停止 MQTT 服务""" + if not self._running: + return + + self._running = False + + if self._client: + self._client.loop_stop() + self._client.disconnect() + logger.info("MQTT 服务已停止") + + def _on_connect(self, client, userdata, flags, reason_code, properties): + """连接回调""" + if reason_code == 0: + self._connected = True + logger.info("MQTT 连接成功") + + # 订阅告警主题 + client.subscribe(settings.mqtt.alert_topic, settings.mqtt.qos) + logger.info(f"已订阅告警主题: {settings.mqtt.alert_topic}") + + # 心跳主题通常是 edge/alert/heartbeat/# 但它已经被 edge/alert/# 包含 + # 所以不需要单独订阅 + else: + self._connected = False + logger.error(f"MQTT 连接失败: {reason_code}") + + def _on_disconnect(self, client, userdata, reason_code, properties): + """断开连接回调""" + self._connected = False + logger.warning(f"MQTT 连接断开: {reason_code}") + + def _on_message(self, client, userdata, msg: mqtt.MQTTMessage): + """消息回调""" + with self._lock: + self._stats["messages_received"] += 1 + + try: + topic = msg.topic + payload = json.loads(msg.payload.decode("utf-8")) + + logger.debug(f"收到 MQTT 消息: topic={topic}") + + # 判断消息类型 + if "/heartbeat/" in topic: + self._handle_heartbeat(topic, payload) + else: + self._handle_alert(topic, payload) + + except json.JSONDecodeError as e: + logger.error(f"MQTT 消息解析失败: {e}") + with self._lock: + self._stats["errors"] += 1 + except Exception as e: + logger.error(f"MQTT 消息处理失败: {e}") + with self._lock: + self._stats["errors"] += 1 + + def _handle_alert(self, topic: str, payload: Dict[str, Any]): + """处理告警消息""" + with self._lock: + self._stats["alerts_received"] += 1 + + # 从 topic 解析 camera_id 和 roi_id + # topic 格式: edge/alert/{camera_id}/{roi_id} + parts = topic.split("/") + if len(parts) >= 4: + camera_id = parts[2] + roi_id = parts[3] if len(parts) > 3 else None + + # 确保 payload 中有这些字段 + payload.setdefault("camera_id", camera_id) + payload.setdefault("roi_id", roi_id) + + logger.info( + f"收到告警: camera={payload.get('camera_id')}, " + f"type={payload.get('alert_type')}, " + f"confidence={payload.get('confidence')}" + ) + + # 调用所有注册的处理器 + for handler in self._alert_handlers: + try: + handler(payload) + except Exception as e: + logger.error(f"告警处理器执行失败: {e}") + + def _handle_heartbeat(self, topic: str, payload: Dict[str, Any]): + """处理心跳消息""" + with self._lock: + self._stats["heartbeats_received"] += 1 + + # 从 topic 解析 device_id + # topic 格式: edge/alert/heartbeat/{device_id} + parts = topic.split("/") + if len(parts) >= 4: + device_id = parts[3] + payload.setdefault("device_id", device_id) + + logger.debug(f"收到心跳: device={payload.get('device_id')}") + + # 调用所有注册的处理器 + for handler in self._heartbeat_handlers: + try: + handler(payload) + except Exception as e: + logger.error(f"心跳处理器执行失败: {e}") + + def get_statistics(self) -> Dict[str, Any]: + """获取统计信息""" + with self._lock: + return { + **self._stats, + "connected": self._connected, + "running": self._running, + } + + +# 全局单例 +_mqtt_service: Optional[MQTTService] = None + + +def get_mqtt_service() -> MQTTService: + """获取 MQTT 服务单例""" + global _mqtt_service + if _mqtt_service is None: + _mqtt_service = MQTTService() + return _mqtt_service diff --git a/app/services/notification_service.py b/app/services/notification_service.py new file mode 100644 index 0000000..c5e2ce3 --- /dev/null +++ b/app/services/notification_service.py @@ -0,0 +1,145 @@ +""" +通知服务 +支持 WebSocket 实时推送 +""" +import asyncio +import json +from datetime import datetime +from typing import Dict, Any, List, Set +from fastapi import WebSocket + +from app.utils.logger import logger + + +class ConnectionManager: + """WebSocket 连接管理器""" + + def __init__(self): + # 活跃连接 + self._active_connections: Set[WebSocket] = set() + + async def connect(self, websocket: WebSocket): + """接受新连接""" + await websocket.accept() + self._active_connections.add(websocket) + logger.info(f"WebSocket 客户端已连接, 当前连接数: {len(self._active_connections)}") + + def disconnect(self, websocket: WebSocket): + """断开连接""" + self._active_connections.discard(websocket) + logger.info(f"WebSocket 客户端已断开, 当前连接数: {len(self._active_connections)}") + + async def broadcast(self, message: Dict[str, Any]): + """广播消息到所有客户端""" + if not self._active_connections: + return + + message_json = json.dumps(message, ensure_ascii=False, default=str) + disconnected = set() + + for connection in self._active_connections: + try: + await connection.send_text(message_json) + except Exception as e: + logger.warning(f"WebSocket 发送失败: {e}") + disconnected.add(connection) + + # 移除断开的连接 + self._active_connections -= disconnected + + async def send_personal(self, websocket: WebSocket, message: Dict[str, Any]): + """发送消息到指定客户端""" + try: + message_json = json.dumps(message, ensure_ascii=False, default=str) + await websocket.send_text(message_json) + except Exception as e: + logger.warning(f"WebSocket 发送失败: {e}") + + @property + def connection_count(self) -> int: + return len(self._active_connections) + + +class NotificationService: + """通知服务""" + + def __init__(self): + self._manager = ConnectionManager() + self._loop: asyncio.AbstractEventLoop = None + + @property + def manager(self) -> ConnectionManager: + return self._manager + + def set_event_loop(self, loop: asyncio.AbstractEventLoop): + """设置事件循环(用于从同步代码调用异步方法)""" + self._loop = loop + + async def notify_new_alert(self, alert_data: Dict[str, Any]): + """通知新告警""" + message = { + "event": "new_alert", + "data": alert_data, + "timestamp": datetime.utcnow().isoformat(), + } + await self._manager.broadcast(message) + logger.debug(f"已广播新告警通知: {alert_data.get('alert_no', 'N/A')}") + + async def notify_alert_updated(self, alert_data: Dict[str, Any]): + """通知告警更新""" + message = { + "event": "alert_updated", + "data": alert_data, + "timestamp": datetime.utcnow().isoformat(), + } + await self._manager.broadcast(message) + + async def notify_device_status(self, device_data: Dict[str, Any]): + """通知设备状态变更""" + message = { + "event": "device_status", + "data": device_data, + "timestamp": datetime.utcnow().isoformat(), + } + await self._manager.broadcast(message) + + async def notify_work_order(self, order_data: Dict[str, Any], event_type: str = "new"): + """通知工单事件""" + message = { + "event": f"work_order_{event_type}", + "data": order_data, + "timestamp": datetime.utcnow().isoformat(), + } + await self._manager.broadcast(message) + + def notify_sync(self, event: str, data: Dict[str, Any]): + """同步方式通知(用于从 MQTT 回调等同步代码调用)""" + if self._loop is None: + logger.warning("事件循环未设置,无法发送通知") + return + + message = { + "event": event, + "data": data, + "timestamp": datetime.utcnow().isoformat(), + } + + async def _broadcast(): + await self._manager.broadcast(message) + + try: + asyncio.run_coroutine_threadsafe(_broadcast(), self._loop) + except Exception as e: + logger.error(f"同步通知失败: {e}") + + +# 全局单例 +_notification_service = None + + +def get_notification_service() -> NotificationService: + """获取通知服务单例""" + global _notification_service + if _notification_service is None: + _notification_service = NotificationService() + return _notification_service