diff --git a/app/config.py b/app/config.py index e537dcb..af077e9 100644 --- a/app/config.py +++ b/app/config.py @@ -43,20 +43,6 @@ class AIModelConfig: api_key: str = "" -@dataclass -class MQTTConfig: - """MQTT 配置 (已废弃 - 告警上报已改为 HTTP + COS)""" - broker_host: str = "localhost" - broker_port: int = 1883 - client_id: str = "alert_platform" - username: str = "" - password: str = "" - alert_topic: str = "edge/alert/#" - heartbeat_topic: str = "edge/alert/heartbeat/#" - qos: int = 1 - enabled: bool = False # 默认禁用 - - @dataclass class RedisConfig: """Redis 配置""" @@ -103,7 +89,6 @@ class Settings(BaseModel): cos: COSConfig = COSConfig() app: AppConfig = AppConfig() ai_model: AIModelConfig = AIModelConfig() - mqtt: MQTTConfig = MQTTConfig() redis: RedisConfig = RedisConfig() camera_name: CameraNameConfig = CameraNameConfig() @@ -137,17 +122,6 @@ def load_settings() -> Settings: endpoint=os.getenv("AI_MODEL_ENDPOINT", ""), api_key=os.getenv("AI_MODEL_API_KEY", ""), ), - mqtt=MQTTConfig( - broker_host=os.getenv("MQTT_BROKER_HOST", "localhost"), - broker_port=int(os.getenv("MQTT_BROKER_PORT", "1883")), - client_id=os.getenv("MQTT_CLIENT_ID", "alert_platform"), - username=os.getenv("MQTT_USERNAME", ""), - password=os.getenv("MQTT_PASSWORD", ""), - alert_topic=os.getenv("MQTT_ALERT_TOPIC", "edge/alert/#"), - heartbeat_topic=os.getenv("MQTT_HEARTBEAT_TOPIC", "edge/alert/heartbeat/#"), - qos=int(os.getenv("MQTT_QOS", "1")), - enabled=os.getenv("MQTT_ENABLED", "false").lower() == "true", - ), redis=RedisConfig( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", "6379")), diff --git a/app/routers/yudao_aiot_edge.py b/app/routers/yudao_aiot_edge.py index 96b11ca..b9d8742 100644 --- a/app/routers/yudao_aiot_edge.py +++ b/app/routers/yudao_aiot_edge.py @@ -7,6 +7,9 @@ API 路径规范: - /admin-api/aiot/edge/device/page - 分页查询设备 - /admin-api/aiot/edge/device/get - 获取设备详情 - /admin-api/aiot/edge/device/statistics - 设备统计 + +注意:边缘端未实现心跳机制,因此运行时长/处理帧数等实时指标不可用。 +告警数统计从 alarm_event 表中提取 edge_node_id 字段。 """ from fastapi import APIRouter, Query, Depends, HTTPException @@ -14,6 +17,7 @@ from typing import Optional from app.yudao_compat import YudaoResponse, get_current_user from app.services.device_service import get_device_service, DeviceService +from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService router = APIRouter(prefix="/admin-api/aiot/edge", tags=["AIoT-边缘设备"]) @@ -24,9 +28,10 @@ async def get_device_page( pageSize: int = Query(20, ge=1, le=100, description="每页大小"), status: Optional[str] = Query(None, description="设备状态: online/offline/error"), service: DeviceService = Depends(get_device_service), + alarm_service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): - """分页查询边缘设备列表""" + """分页查询边缘设备列表(告警数从 alarm_event 表统计)""" devices, total = service.get_devices( status=status, page=pageNo, @@ -36,16 +41,21 @@ async def get_device_page( device_list = [] for device in devices: device_dict = device.to_dict() + device_id = device_dict.get("device_id") + + # 从 alarm_event 表统计告警数 + alerts_count = alarm_service.count_alarms_by_edge_node(device_id) + device_list.append({ "id": device_dict.get("id"), - "deviceId": device_dict.get("device_id"), + "deviceId": device_id, "deviceName": device_dict.get("device_name"), "status": device_dict.get("status"), "statusName": _get_status_name(device_dict.get("status")), "lastHeartbeat": device_dict.get("last_heartbeat"), - "uptimeSeconds": device_dict.get("uptime_seconds"), - "framesProcessed": device_dict.get("frames_processed"), - "alertsGenerated": device_dict.get("alerts_generated"), + "uptimeSeconds": None, # 无心跳机制,不可用 + "framesProcessed": None, # 无心跳机制,不可用 + "alertsGenerated": alerts_count, # 从 alarm_event 表统计 "ipAddress": device_dict.get("ip_address"), "streamCount": device_dict.get("stream_count"), "configVersion": device_dict.get("config_version"), @@ -65,24 +75,30 @@ async def get_device_page( async def get_device( id: str = Query(..., description="设备ID"), service: DeviceService = Depends(get_device_service), + alarm_service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): - """获取设备详情""" + """获取设备详情(告警数从 alarm_event 表统计)""" device = service.get_device(id) if not device: raise HTTPException(status_code=404, detail="设备不存在") device_dict = device.to_dict() + device_id = device_dict.get("device_id") + + # 从 alarm_event 表统计告警数 + alerts_count = alarm_service.count_alarms_by_edge_node(device_id) + return YudaoResponse.success({ "id": device_dict.get("id"), - "deviceId": device_dict.get("device_id"), + "deviceId": device_id, "deviceName": device_dict.get("device_name"), "status": device_dict.get("status"), "statusName": _get_status_name(device_dict.get("status")), "lastHeartbeat": device_dict.get("last_heartbeat"), - "uptimeSeconds": device_dict.get("uptime_seconds"), - "framesProcessed": device_dict.get("frames_processed"), - "alertsGenerated": device_dict.get("alerts_generated"), + "uptimeSeconds": None, # 无心跳机制,不可用 + "framesProcessed": None, # 无心跳机制,不可用 + "alertsGenerated": alerts_count, # 从 alarm_event 表统计 "ipAddress": device_dict.get("ip_address"), "streamCount": device_dict.get("stream_count"), "configVersion": device_dict.get("config_version"), diff --git a/app/schemas.py b/app/schemas.py index d2e022f..03485e7 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -108,7 +108,6 @@ class DeviceStatisticsResponse(BaseModel): class HealthResponse(BaseModel): status: str database: str - mqtt: Optional[str] = None websocket_connections: Optional[int] = None diff --git a/app/services/alarm_event_service.py b/app/services/alarm_event_service.py index 12c7a4b..2d32710 100644 --- a/app/services/alarm_event_service.py +++ b/app/services/alarm_event_service.py @@ -588,6 +588,20 @@ class AlarmEventService: finally: db.close() + def count_alarms_by_edge_node(self, edge_node_id: str) -> int: + """统计指定边缘节点的告警数量""" + db = get_session() + try: + count = db.query(AlarmEvent).filter( + AlarmEvent.edge_node_id == edge_node_id + ).count() + return count + except Exception as e: + logger.error(f"统计边缘节点告警数失败: {e}") + return 0 + finally: + db.close() + def save_llm_analysis( self, alarm_id: str, diff --git a/app/services/alert_service.py b/app/services/alert_service.py index 8024b4e..68dff8e 100644 --- a/app/services/alert_service.py +++ b/app/services/alert_service.py @@ -1,6 +1,6 @@ """ 告警服务 -处理告警 CRUD 和 MQTT 消息 +处理告警 CRUD(旧版告警系统,已迁移至 alarm_event_service) """ import uuid from datetime import datetime, timezone @@ -63,64 +63,6 @@ class AlertService: finally: db.close() - def create_alert_from_mqtt(self, mqtt_data: Dict[str, Any]) -> Optional[Alert]: - """从 MQTT 消息创建告警""" - db = get_session() - try: - # 解析时间 - trigger_time_str = mqtt_data.get("timestamp") - if trigger_time_str: - try: - trigger_time = datetime.fromisoformat(trigger_time_str.replace("Z", "+00:00")) - except ValueError: - trigger_time = datetime.now(timezone.utc) - else: - trigger_time = datetime.now(timezone.utc) - - # 解析置信度(MQTT 可能是 0-1,需要转为 0-100) - confidence = mqtt_data.get("confidence") - if confidence is not None: - if isinstance(confidence, float) and confidence <= 1: - confidence = int(confidence * 100) - else: - confidence = int(confidence) - - # 解析持续时长 - duration_minutes = mqtt_data.get("duration_minutes") - if duration_minutes is not None: - duration_minutes = int(float(duration_minutes)) - - alert = Alert( - alert_no=self.generate_alert_no(), - camera_id=mqtt_data.get("camera_id", "unknown"), - roi_id=mqtt_data.get("roi_id"), - bind_id=mqtt_data.get("bind_id"), - device_id=mqtt_data.get("device_id"), - alert_type=mqtt_data.get("alert_type", "unknown"), - algorithm=mqtt_data.get("algorithm", "YOLO"), - confidence=confidence, - duration_minutes=duration_minutes, - trigger_time=trigger_time, - message=mqtt_data.get("message"), - bbox=mqtt_data.get("bbox"), - status=AlertStatus.PENDING, - level=self._determine_level(mqtt_data), - ) - - db.add(alert) - db.commit() - db.refresh(alert) - - logger.info(f"MQTT告警创建成功: {alert.alert_no}, type={alert.alert_type}") - return alert - - except Exception as e: - db.rollback() - logger.error(f"创建MQTT告警失败: {e}") - return None - finally: - db.close() - def _determine_level(self, data: Dict[str, Any]) -> AlertLevel: """根据告警数据确定告警级别""" alert_type = data.get("alert_type", "") diff --git a/app/services/device_service.py b/app/services/device_service.py index da802a4..f70e7af 100644 --- a/app/services/device_service.py +++ b/app/services/device_service.py @@ -19,65 +19,14 @@ class DeviceService: self._devices: Dict[str, Dict[str, Any]] = {} # 内存缓存 def handle_heartbeat(self, heartbeat_data: Dict[str, Any]) -> Optional[EdgeDevice]: - """处理心跳消息""" - device_id = heartbeat_data.get("device_id") - if not device_id: - logger.warning("心跳消息缺少 device_id") - return None + """ + 处理心跳消息(已废弃 - 边缘端未实现心跳机制) - status_info = heartbeat_data.get("status", {}) - - db = get_session() - try: - # 查找或创建设备 - device = db.query(EdgeDevice).filter( - EdgeDevice.device_id == device_id - ).first() - - now = datetime.now(timezone.utc) - - if device is None: - # 新设备 - device = EdgeDevice( - device_id=device_id, - device_name=f"边缘设备_{device_id[:8]}", - status=DeviceStatus.ONLINE, - last_heartbeat=now, - ) - db.add(device) - logger.info(f"发现新设备: {device_id}") - else: - # 更新状态 - old_status = device.status - device.status = DeviceStatus.ONLINE - device.last_heartbeat = now - - if old_status != DeviceStatus.ONLINE: - logger.info(f"设备上线: {device_id}") - - # 更新运行信息 - if status_info: - device.uptime_seconds = status_info.get("uptime_seconds") - device.frames_processed = status_info.get("frames_processed") - device.alerts_generated = status_info.get("alerts_generated") - device.extra_info = status_info.get("stream_stats") - - device.updated_at = now - - db.commit() - db.refresh(device) - - # 更新内存缓存 - self._devices[device_id] = device.to_dict() - - return device - - except Exception as e: - db.rollback() - logger.error(f"处理心跳失败: {e}") - return None - finally: - db.close() + 保留此方法以兼容旧代码,实际上不会被调用。 + 边缘节点信息应通过手动创建或配置管理。 + """ + logger.warning("handle_heartbeat 被调用,但心跳机制已废弃,忽略") + return None def check_offline_devices(self) -> List[EdgeDevice]: """检查离线设备""" diff --git a/app/services/mqtt_service.py b/app/services/mqtt_service.py deleted file mode 100644 index 5baabf0..0000000 --- a/app/services/mqtt_service.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -MQTT 服务 - 已废弃 - -告警上报已改为 HTTP + COS 方案(边缘端直传)。 -此文件保留为空壳,避免其他模块 import 报错。 -后续版本将彻底删除此文件。 -""" - -from typing import Dict, Any, Optional - - -class MQTTService: - """MQTT 服务 (已废弃,保留空壳兼容旧代码)""" - - def __init__(self): - pass - - @property - def is_connected(self) -> bool: - return False - - def register_alert_handler(self, handler): - pass - - def register_heartbeat_handler(self, handler): - pass - - def start(self): - pass - - def stop(self): - pass - - def get_statistics(self) -> Dict[str, Any]: - return { - "messages_received": 0, - "alerts_received": 0, - "heartbeats_received": 0, - "errors": 0, - "connected": False, - "running": False, - "deprecated": True, - } - - -_mqtt_service: Optional[MQTTService] = None - - -def get_mqtt_service() -> MQTTService: - """获取 MQTT 服务单例 (已废弃)""" - global _mqtt_service - if _mqtt_service is None: - _mqtt_service = MQTTService() - return _mqtt_service