""" 告警查询工具:统计、列表、详情 """ import json from datetime import timedelta from typing import Optional from langchain_core.tools import tool from langchain_core.runnables import RunnableConfig from app.utils.logger import logger from app.utils.timezone import beijing_now # 告警类型/级别/状态 中文映射 ALARM_TYPE_NAMES = { "leave_post": "人员离岗", "intrusion": "周界入侵", "illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵", } ALARM_LEVEL_NAMES = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"} ALARM_STATUS_NAMES = { "NEW": "待处理", "CONFIRMED": "处理中", "FALSE": "误报", "CLOSED": "已关闭", } def _parse_time_range(time_range: str): """解析时间范围,返回 (start_time, label)""" now = beijing_now() if time_range == "week": start = now - timedelta(days=now.weekday()) start = start.replace(hour=0, minute=0, second=0, microsecond=0) return start, "本周" elif time_range == "month": start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) return start, "本月" else: start = now.replace(hour=0, minute=0, second=0, microsecond=0) return start, "今日" def _get_camera_display_name(device_id: str) -> str: """同步获取摄像头显示名称""" try: import asyncio from app.services.camera_name_service import get_camera_name_service camera_service = get_camera_name_service() loop = asyncio.get_event_loop() if loop.is_running(): import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as pool: cam_info = pool.submit( asyncio.run, camera_service.get_camera_info(device_id) ).result(timeout=5) else: cam_info = asyncio.run(camera_service.get_camera_info(device_id)) return camera_service.format_display_name(device_id, cam_info) except Exception: return device_id @tool def query_alarm_stats(time_range: str = "today", alarm_type: str = "all") -> str: """查询告警统计数据(总数、按类型分布、按状态分布) Args: time_range: 时间范围 today=今日 week=本周 month=本月 alarm_type: 告警类型筛选 leave_post/intrusion/illegal_parking/vehicle_congestion/all """ from app.services.alarm_event_service import get_alarm_event_service svc = get_alarm_event_service() start, range_label = _parse_time_range(time_range) now = beijing_now() alarm_type_filter = None if alarm_type == "all" else alarm_type alarms, total = svc.get_alarms( alarm_type=alarm_type_filter, start_time=start, end_time=now, page=1, page_size=10000, ) type_count = {} status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0} for a in alarms: type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1 if a.alarm_status in status_count: status_count[a.alarm_status] += 1 result = { "range": range_label, "total": total, "by_type": {ALARM_TYPE_NAMES.get(t, t): c for t, c in type_count.items()}, "by_status": {ALARM_STATUS_NAMES.get(s, s): c for s, c in status_count.items()}, } return json.dumps(result, ensure_ascii=False) @tool def list_alarms( time_range: str = "today", alarm_type: str = "all", alarm_status: str = "", limit: int = 10, ) -> str: """查询告警列表,返回最近的告警记录(含ID、类型、摄像头、状态、时间) Args: time_range: 时间范围 today/week/month alarm_type: 告警类型筛选 leave_post/intrusion/illegal_parking/vehicle_congestion/all alarm_status: 告警状态筛选 NEW=待处理 CONFIRMED=处理中 FALSE=误报 CLOSED=已关闭 limit: 返回条数,默认10,最多20 """ from app.services.alarm_event_service import get_alarm_event_service svc = get_alarm_event_service() start, range_label = _parse_time_range(time_range) now = beijing_now() alarm_type_filter = None if alarm_type == "all" else alarm_type status_filter = alarm_status if alarm_status else None limit = min(limit, 20) alarms, total = svc.get_alarms( alarm_type=alarm_type_filter, alarm_status=status_filter, start_time=start, end_time=now, page=1, page_size=limit, ) items = [] for a in alarms: cam_name = _get_camera_display_name(a.device_id) event_time = "" if a.event_time: try: event_time = a.event_time.strftime("%m-%d %H:%M") except Exception: event_time = str(a.event_time)[:16] items.append({ "alarm_id": a.alarm_id, "type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type), "camera": cam_name, "status": ALARM_STATUS_NAMES.get(a.alarm_status, a.alarm_status), "level": ALARM_LEVEL_NAMES.get(a.alarm_level, "普通"), "time": event_time, }) result = {"range": range_label, "total": total, "items": items} return json.dumps(result, ensure_ascii=False) @tool def get_alarm_detail(alarm_id: str, config: RunnableConfig) -> str: """查询单条告警的详细信息(含扩展信息和AI分析结果) Args: alarm_id: 告警ID(如 edge_xxx 或 ALM_xxx) """ from app.services.alarm_event_service import get_alarm_event_service svc = get_alarm_event_service() detail = svc.get_alarm(alarm_id) if not detail: return json.dumps({"error": f"未找到告警: {alarm_id}"}, ensure_ascii=False) snapshot_url = detail.get("snapshot_url", "") result = { "alarm_id": detail.get("alarm_id"), "alarm_type": ALARM_TYPE_NAMES.get(detail.get("alarm_type", ""), detail.get("alarm_type", "")), "device_id": detail.get("device_id"), "alarm_status": ALARM_STATUS_NAMES.get(detail.get("alarm_status", ""), detail.get("alarm_status", "")), "alarm_level": ALARM_LEVEL_NAMES.get(detail.get("alarm_level"), "普通"), "event_time": str(detail.get("event_time", ""))[:19], "handle_status": detail.get("handle_status"), "handler": detail.get("handler"), "has_snapshot": bool(snapshot_url), "snapshot_url": snapshot_url, } # 摄像头名称 result["camera_name"] = _get_camera_display_name(detail.get("device_id", "")) # LLM 分析 analyses = detail.get("llm_analyses", []) if analyses: latest = analyses[-1] result["ai_analysis"] = latest.get("summary", "") return json.dumps(result, ensure_ascii=False)