diff --git a/app/services/alert_service.py b/app/services/alert_service.py index c7ab999..6da10e3 100644 --- a/app/services/alert_service.py +++ b/app/services/alert_service.py @@ -1,18 +1,25 @@ +""" +告警服务 +处理告警 CRUD 和 MQTT 消息 +""" import uuid from datetime import datetime, timezone -from typing import Optional, List +from typing import Optional, List, Dict, Any -from app.models import Alert, AlertStatus, get_session +from app.models import Alert, AlertStatus, AlertLevel, get_session from app.schemas import AlertCreate, AlertHandleRequest from app.services.oss_storage import get_oss_storage from app.utils.logger import logger class AlertService: + """告警服务""" + def __init__(self): self.oss = get_oss_storage() def generate_alert_no(self) -> str: + """生成告警编号""" timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") unique_id = uuid.uuid4().hex[:8].upper() return f"ALT{timestamp}{unique_id}" @@ -22,19 +29,24 @@ class AlertService: alert_data: AlertCreate, snapshot_data: Optional[bytes] = None, ) -> Alert: + """创建告警(HTTP 方式)""" db = get_session() try: alert = Alert( alert_no=self.generate_alert_no(), camera_id=alert_data.camera_id, roi_id=alert_data.roi_id, + bind_id=getattr(alert_data, 'bind_id', None), + device_id=getattr(alert_data, 'device_id', None), alert_type=alert_data.alert_type, algorithm=alert_data.algorithm, confidence=alert_data.confidence, duration_minutes=alert_data.duration_minutes, trigger_time=alert_data.trigger_time, message=alert_data.message, + bbox=getattr(alert_data, 'bbox', None), status=AlertStatus.PENDING, + level=AlertLevel.MEDIUM, ) if snapshot_data: @@ -51,33 +63,129 @@ class AlertService: finally: db.close() + def create_alert_from_mqtt(self, mqtt_data: Dict[str, Any]) -> Optional[Alert]: + """从 MQTT 消息创建告警""" + db = get_session() + try: + # 解析时间 + trigger_time_str = mqtt_data.get("timestamp") + if trigger_time_str: + try: + trigger_time = datetime.fromisoformat(trigger_time_str.replace("Z", "+00:00")) + except ValueError: + trigger_time = datetime.now(timezone.utc) + else: + trigger_time = datetime.now(timezone.utc) + + # 解析置信度(MQTT 可能是 0-1,需要转为 0-100) + confidence = mqtt_data.get("confidence") + if confidence is not None: + if isinstance(confidence, float) and confidence <= 1: + confidence = int(confidence * 100) + else: + confidence = int(confidence) + + # 解析持续时长 + duration_minutes = mqtt_data.get("duration_minutes") + if duration_minutes is not None: + duration_minutes = int(float(duration_minutes)) + + alert = Alert( + alert_no=self.generate_alert_no(), + camera_id=mqtt_data.get("camera_id", "unknown"), + roi_id=mqtt_data.get("roi_id"), + bind_id=mqtt_data.get("bind_id"), + device_id=mqtt_data.get("device_id"), + alert_type=mqtt_data.get("alert_type", "unknown"), + algorithm=mqtt_data.get("algorithm", "YOLO"), + confidence=confidence, + duration_minutes=duration_minutes, + trigger_time=trigger_time, + message=mqtt_data.get("message"), + bbox=mqtt_data.get("bbox"), + status=AlertStatus.PENDING, + level=self._determine_level(mqtt_data), + ) + + db.add(alert) + db.commit() + db.refresh(alert) + + logger.info(f"MQTT告警创建成功: {alert.alert_no}, type={alert.alert_type}") + return alert + + except Exception as e: + db.rollback() + logger.error(f"创建MQTT告警失败: {e}") + return None + finally: + db.close() + + def _determine_level(self, data: Dict[str, Any]) -> AlertLevel: + """根据告警数据确定告警级别""" + alert_type = data.get("alert_type", "") + confidence = data.get("confidence", 0) + + # 根据告警类型和置信度确定级别 + if alert_type == "intrusion": + return AlertLevel.HIGH + elif alert_type == "leave_post": + duration = data.get("duration_minutes", 0) + if duration and duration > 30: + return AlertLevel.HIGH + elif duration and duration > 10: + return AlertLevel.MEDIUM + return AlertLevel.LOW + elif confidence and confidence > 0.9: + return AlertLevel.HIGH + elif confidence and confidence > 0.7: + return AlertLevel.MEDIUM + + return AlertLevel.MEDIUM + def get_alert(self, alert_id: int) -> Optional[Alert]: + """获取告警详情""" db = get_session() try: return db.query(Alert).filter(Alert.id == alert_id).first() finally: db.close() + def get_alert_by_no(self, alert_no: str) -> Optional[Alert]: + """根据告警编号获取""" + db = get_session() + try: + return db.query(Alert).filter(Alert.alert_no == alert_no).first() + finally: + db.close() + def get_alerts( self, camera_id: Optional[str] = None, + device_id: Optional[str] = None, alert_type: Optional[str] = None, status: Optional[str] = None, + level: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, page: int = 1, page_size: int = 20, ) -> tuple[List[Alert], int]: + """获取告警列表""" db = get_session() try: query = db.query(Alert) if camera_id: query = query.filter(Alert.camera_id == camera_id) + if device_id: + query = query.filter(Alert.device_id == device_id) if alert_type: query = query.filter(Alert.alert_type == alert_type) if status: query = query.filter(Alert.status == status) + if level: + query = query.filter(Alert.level == level) if start_time: query = query.filter(Alert.trigger_time >= start_time) if end_time: @@ -101,6 +209,7 @@ class AlertService: handle_data: AlertHandleRequest, handled_by: Optional[str] = None, ) -> Optional[Alert]: + """处理告警""" db = get_session() try: alert = db.query(Alert).filter(Alert.id == alert_id).first() @@ -122,7 +231,50 @@ class AlertService: finally: db.close() + def dispatch_alert(self, alert_id: int, work_order_id: int) -> Optional[Alert]: + """派发告警(关联工单)""" + db = get_session() + try: + alert = db.query(Alert).filter(Alert.id == alert_id).first() + if not alert: + return None + + alert.status = AlertStatus.DISPATCHED + alert.work_order_id = work_order_id + alert.updated_at = datetime.now(timezone.utc) + + db.commit() + db.refresh(alert) + + logger.info(f"告警已派单: {alert.alert_no} -> 工单 {work_order_id}") + return alert + + finally: + db.close() + + def delete_alert(self, alert_id: int) -> bool: + """删除告警""" + db = get_session() + try: + alert = db.query(Alert).filter(Alert.id == alert_id).first() + if not alert: + return False + + db.delete(alert) + db.commit() + + logger.info(f"告警已删除: {alert.alert_no}") + return True + + except Exception as e: + db.rollback() + logger.error(f"删除告警失败: {e}") + return False + finally: + db.close() + def get_statistics(self) -> dict: + """获取告警统计""" db = get_session() try: total = db.query(Alert).count() @@ -130,24 +282,32 @@ class AlertService: confirmed = db.query(Alert).filter(Alert.status == AlertStatus.CONFIRMED).count() ignored = db.query(Alert).filter(Alert.status == AlertStatus.IGNORED).count() resolved = db.query(Alert).filter(Alert.status == AlertStatus.RESOLVED).count() + dispatched = db.query(Alert).filter(Alert.status == AlertStatus.DISPATCHED).count() by_type = {} for alert in db.query(Alert.alert_type).distinct(): alert_type = alert[0] by_type[alert_type] = db.query(Alert).filter(Alert.alert_type == alert_type).count() + by_level = {} + for level in AlertLevel: + by_level[level.value] = db.query(Alert).filter(Alert.level == level).count() + return { "total": total, "pending": pending, "confirmed": confirmed, "ignored": ignored, "resolved": resolved, + "dispatched": dispatched, "by_type": by_type, + "by_level": by_level, } finally: db.close() def update_ai_analysis(self, alert_id: int, analysis: dict) -> Optional[Alert]: + """更新 AI 分析结果""" db = get_session() try: alert = db.query(Alert).filter(Alert.id == alert_id).first() @@ -160,8 +320,10 @@ class AlertService: db.close() +# 全局单例 alert_service = AlertService() def get_alert_service() -> AlertService: + """获取告警服务单例""" return alert_service