import asyncio import httpx from typing import Optional from app.config import settings from app.utils.logger import logger class AIAnalyzer: def __init__(self): self.endpoint = settings.ai_model.endpoint self.api_key = settings.ai_model.api_key async def analyze_alert( self, alert_id: int, snapshot_url: str, alert_info: dict, ) -> Optional[dict]: if not self.endpoint: logger.debug("AI模型端点未配置,跳过分析") return None try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{self.endpoint}/analyze", json={ "alert_id": alert_id, "snapshot_url": snapshot_url, "alert_info": alert_info, }, headers={"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}, ) response.raise_for_status() result = response.json() logger.info(f"AI分析完成: alert_id={alert_id}") return result except Exception as e: logger.error(f"AI分析失败: alert_id={alert_id}, error={e}") return None ai_analyzer = AIAnalyzer() async def trigger_async_analysis(alert_id: int, snapshot_url: str, alert_info: dict): asyncio.create_task( ai_analyzer.analyze_alert(alert_id, snapshot_url, alert_info) ) def get_ai_analyzer() -> AIAnalyzer: return ai_analyzer