""" 设备服务 管理边缘设备状态 """ from datetime import datetime, timezone, timedelta from typing import Dict, Any, List, Optional from app.models import EdgeDevice, DeviceStatus, get_session from app.utils.logger import logger class DeviceService: """设备服务""" # 设备离线超时时间(秒) OFFLINE_TIMEOUT = 90 def __init__(self): self._devices: Dict[str, Dict[str, Any]] = {} # 内存缓存 def handle_heartbeat(self, heartbeat_data: Dict[str, Any]) -> Optional[EdgeDevice]: """处理心跳消息""" device_id = heartbeat_data.get("device_id") if not device_id: logger.warning("心跳消息缺少 device_id") return None status_info = heartbeat_data.get("status", {}) db = get_session() try: # 查找或创建设备 device = db.query(EdgeDevice).filter( EdgeDevice.device_id == device_id ).first() now = datetime.now(timezone.utc) if device is None: # 新设备 device = EdgeDevice( device_id=device_id, device_name=f"边缘设备_{device_id[:8]}", status=DeviceStatus.ONLINE, last_heartbeat=now, ) db.add(device) logger.info(f"发现新设备: {device_id}") else: # 更新状态 old_status = device.status device.status = DeviceStatus.ONLINE device.last_heartbeat = now if old_status != DeviceStatus.ONLINE: logger.info(f"设备上线: {device_id}") # 更新运行信息 if status_info: device.uptime_seconds = status_info.get("uptime_seconds") device.frames_processed = status_info.get("frames_processed") device.alerts_generated = status_info.get("alerts_generated") device.extra_info = status_info.get("stream_stats") device.updated_at = now db.commit() db.refresh(device) # 更新内存缓存 self._devices[device_id] = device.to_dict() return device except Exception as e: db.rollback() logger.error(f"处理心跳失败: {e}") return None finally: db.close() def check_offline_devices(self) -> List[EdgeDevice]: """检查离线设备""" db = get_session() try: threshold = datetime.now(timezone.utc) - timedelta(seconds=self.OFFLINE_TIMEOUT) # 查找需要标记为离线的设备 devices = db.query(EdgeDevice).filter( EdgeDevice.status == DeviceStatus.ONLINE, EdgeDevice.last_heartbeat < threshold ).all() offline_devices = [] for device in devices: device.status = DeviceStatus.OFFLINE device.updated_at = datetime.now(timezone.utc) offline_devices.append(device) logger.info(f"设备离线: {device.device_id}") if offline_devices: db.commit() return offline_devices except Exception as e: db.rollback() logger.error(f"检查离线设备失败: {e}") return [] finally: db.close() def get_device(self, device_id: str) -> Optional[EdgeDevice]: """获取设备信息""" db = get_session() try: return db.query(EdgeDevice).filter( EdgeDevice.device_id == device_id ).first() finally: db.close() def get_devices( self, status: Optional[str] = None, page: int = 1, page_size: int = 20 ) -> tuple[List[EdgeDevice], int]: """获取设备列表""" db = get_session() try: query = db.query(EdgeDevice) if status: query = query.filter(EdgeDevice.status == status) total = query.count() devices = ( query.order_by(EdgeDevice.last_heartbeat.desc()) .offset((page - 1) * page_size) .limit(page_size) .all() ) return devices, total finally: db.close() def get_statistics(self) -> Dict[str, Any]: """获取设备统计""" db = get_session() try: total = db.query(EdgeDevice).count() online = db.query(EdgeDevice).filter( EdgeDevice.status == DeviceStatus.ONLINE ).count() offline = db.query(EdgeDevice).filter( EdgeDevice.status == DeviceStatus.OFFLINE ).count() error = db.query(EdgeDevice).filter( EdgeDevice.status == DeviceStatus.ERROR ).count() return { "total": total, "online": online, "offline": offline, "error": error, } finally: db.close() # 全局单例 _device_service = None def get_device_service() -> DeviceService: """获取设备服务单例""" global _device_service if _device_service is None: _device_service = DeviceService() return _device_service