Files
iot-device-management-service/app/services/device_service.py
16337 6861fb6653 feat: 新增核心服务模块
- mqtt_service: MQTT 订阅服务,对接 ai_edge 告警和心跳
- notification_service: WebSocket 实时推送服务
- device_service: 边缘设备心跳管理和离线检测

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 13:56:57 +08:00

184 lines
5.4 KiB
Python

"""
设备服务
管理边缘设备状态
"""
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, List, Optional
from app.models import EdgeDevice, DeviceStatus, get_session
from app.utils.logger import logger
class DeviceService:
"""设备服务"""
# 设备离线超时时间(秒)
OFFLINE_TIMEOUT = 90
def __init__(self):
self._devices: Dict[str, Dict[str, Any]] = {} # 内存缓存
def handle_heartbeat(self, heartbeat_data: Dict[str, Any]) -> Optional[EdgeDevice]:
"""处理心跳消息"""
device_id = heartbeat_data.get("device_id")
if not device_id:
logger.warning("心跳消息缺少 device_id")
return None
status_info = heartbeat_data.get("status", {})
db = get_session()
try:
# 查找或创建设备
device = db.query(EdgeDevice).filter(
EdgeDevice.device_id == device_id
).first()
now = datetime.now(timezone.utc)
if device is None:
# 新设备
device = EdgeDevice(
device_id=device_id,
device_name=f"边缘设备_{device_id[:8]}",
status=DeviceStatus.ONLINE,
last_heartbeat=now,
)
db.add(device)
logger.info(f"发现新设备: {device_id}")
else:
# 更新状态
old_status = device.status
device.status = DeviceStatus.ONLINE
device.last_heartbeat = now
if old_status != DeviceStatus.ONLINE:
logger.info(f"设备上线: {device_id}")
# 更新运行信息
if status_info:
device.uptime_seconds = status_info.get("uptime_seconds")
device.frames_processed = status_info.get("frames_processed")
device.alerts_generated = status_info.get("alerts_generated")
device.extra_info = status_info.get("stream_stats")
device.updated_at = now
db.commit()
db.refresh(device)
# 更新内存缓存
self._devices[device_id] = device.to_dict()
return device
except Exception as e:
db.rollback()
logger.error(f"处理心跳失败: {e}")
return None
finally:
db.close()
def check_offline_devices(self) -> List[EdgeDevice]:
"""检查离线设备"""
db = get_session()
try:
threshold = datetime.now(timezone.utc) - timedelta(seconds=self.OFFLINE_TIMEOUT)
# 查找需要标记为离线的设备
devices = db.query(EdgeDevice).filter(
EdgeDevice.status == DeviceStatus.ONLINE,
EdgeDevice.last_heartbeat < threshold
).all()
offline_devices = []
for device in devices:
device.status = DeviceStatus.OFFLINE
device.updated_at = datetime.now(timezone.utc)
offline_devices.append(device)
logger.info(f"设备离线: {device.device_id}")
if offline_devices:
db.commit()
return offline_devices
except Exception as e:
db.rollback()
logger.error(f"检查离线设备失败: {e}")
return []
finally:
db.close()
def get_device(self, device_id: str) -> Optional[EdgeDevice]:
"""获取设备信息"""
db = get_session()
try:
return db.query(EdgeDevice).filter(
EdgeDevice.device_id == device_id
).first()
finally:
db.close()
def get_devices(
self,
status: Optional[str] = None,
page: int = 1,
page_size: int = 20
) -> tuple[List[EdgeDevice], int]:
"""获取设备列表"""
db = get_session()
try:
query = db.query(EdgeDevice)
if status:
query = query.filter(EdgeDevice.status == status)
total = query.count()
devices = (
query.order_by(EdgeDevice.last_heartbeat.desc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
return devices, total
finally:
db.close()
def get_statistics(self) -> Dict[str, Any]:
"""获取设备统计"""
db = get_session()
try:
total = db.query(EdgeDevice).count()
online = db.query(EdgeDevice).filter(
EdgeDevice.status == DeviceStatus.ONLINE
).count()
offline = db.query(EdgeDevice).filter(
EdgeDevice.status == DeviceStatus.OFFLINE
).count()
error = db.query(EdgeDevice).filter(
EdgeDevice.status == DeviceStatus.ERROR
).count()
return {
"total": total,
"online": online,
"offline": offline,
"error": error,
}
finally:
db.close()
# 全局单例
_device_service = None
def get_device_service() -> DeviceService:
"""获取设备服务单例"""
global _device_service
if _device_service is None:
_device_service = DeviceService()
return _device_service