diff --git a/app/services/daily_report_service.py b/app/services/daily_report_service.py index 50654a2..95e3c9e 100644 --- a/app/services/daily_report_service.py +++ b/app/services/daily_report_service.py @@ -2,11 +2,12 @@ 每日告警日报定时推送服务 每天定时生成前一天的告警汇总,发送到企微群聊。 +支持按边缘节点分组统计。 """ import asyncio -from datetime import datetime, timedelta -from collections import Counter -from typing import Dict, List, Optional, Tuple +from collections import Counter, defaultdict +from datetime import timedelta +from typing import Dict, List, Optional from app.utils.logger import logger from app.config import settings @@ -22,8 +23,29 @@ ALARM_TYPE_NAMES = { WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] +def _get_edge_name_map() -> Dict[str, str]: + """从 EdgeDevice 表获取 edge_node_id → device_name 映射""" + from app.models import get_session, EdgeDevice + db = get_session() + try: + devices = db.query(EdgeDevice.device_id, EdgeDevice.device_name).all() + return {d.device_id: (d.device_name or d.device_id) for d in devices} + except Exception as e: + logger.warning(f"查询边缘设备名称失败: {e}") + return {} + finally: + db.close() + + +def _format_resp_time(minutes: float) -> str: + """格式化响应时长""" + if minutes < 60: + return f"{minutes:.1f}分钟" + return f"{minutes / 60:.1f}小时" + + async def generate_daily_report() -> Optional[str]: - """生成昨日告警日报 Markdown 内容""" + """生成昨日告警日报 Markdown 内容(含按边缘节点分组)""" from app.services.alarm_event_service import get_alarm_event_service from app.services.camera_name_service import get_camera_name_service from app.utils.timezone import beijing_now @@ -36,7 +58,7 @@ async def generate_daily_report() -> Optional[str]: yesterday_start = today_start - timedelta(days=1) day_before_start = today_start - timedelta(days=2) - # 查询昨日和前日全量告警(page_size 设大,拿全量) + # 查询昨日和前日全量告警 yesterday_alarms, yesterday_total = svc.get_alarms( start_time=yesterday_start, end_time=today_start, page=1, page_size=10000 ) @@ -44,17 +66,18 @@ async def generate_daily_report() -> Optional[str]: start_time=day_before_start, end_time=yesterday_start, page=1, page_size=1 ) + date_str = yesterday_start.strftime("%m-%d") + weekday = WEEKDAY_NAMES[yesterday_start.weekday()] + # 无告警时发送简短通知 if yesterday_total == 0: - date_str = yesterday_start.strftime("%m-%d") - weekday = WEEKDAY_NAMES[yesterday_start.weekday()] return ( f"**AI安防日报 — {date_str}({weekday})**\n\n" f">昨日告警总计:**0** 条\n" f">系统运行正常,无异常事件" ) - # ---- 统计 ---- + # ---- 全局统计 ---- type_counter: Counter = Counter() device_counter: Counter = Counter() handle_done = 0 @@ -63,11 +86,17 @@ async def generate_daily_report() -> Optional[str]: false_alarm = 0 response_times: List[float] = [] + # ---- 按边缘节点分组 ---- + edge_groups: Dict[str, List] = defaultdict(list) + for a in yesterday_alarms: type_counter[a.alarm_type] += 1 device_counter[a.device_id] += 1 - # 处理状态统计 + # 按 edge_node_id 分组(无 edge_node_id 归入 "unknown") + edge_key = a.edge_node_id or "unknown" + edge_groups[edge_key].append(a) + if a.handle_status == "DONE": handle_done += 1 elif a.handle_status == "IGNORED": @@ -75,14 +104,12 @@ async def generate_daily_report() -> Optional[str]: else: handle_unhandled += 1 - # 误报统计 if a.alarm_status == "FALSE": false_alarm += 1 - # 响应时长(从 event_time 到 handled_at) if a.handled_at and a.event_time: delta = (a.handled_at - a.event_time).total_seconds() / 60.0 - if delta >= 0: + if 0 <= delta <= 360: # 排除 >6h 异常值 response_times.append(delta) # 环比变化 @@ -98,28 +125,28 @@ async def generate_daily_report() -> Optional[str]: change_str = "前日无告警" # 平均响应时长 - if response_times: - avg_resp = sum(response_times) / len(response_times) - if avg_resp < 60: - resp_str = f"{avg_resp:.1f}分钟" - else: - resp_str = f"{avg_resp / 60:.1f}小时" - else: - resp_str = "暂无数据" + resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据" # 设备 Top5 — 批量获取摄像头名称 top5_devices = device_counter.most_common(5) - device_ids = [d[0] for d in top5_devices] + all_device_ids = list({d[0] for d in top5_devices}) + # 也收集各边缘节点的 top 设备 ID + for edge_key, alarms in edge_groups.items(): + edge_dev_counter = Counter(a.device_id for a in alarms) + for dev_id, _ in edge_dev_counter.most_common(3): + if dev_id not in all_device_ids: + all_device_ids.append(dev_id) + try: - name_map = await camera_svc.get_display_names_batch(device_ids) + name_map = await camera_svc.get_display_names_batch(all_device_ids) except Exception as e: logger.warning(f"日报获取摄像头名称失败: {e}") name_map = {} - # ---- 组装 Markdown ---- - date_str = yesterday_start.strftime("%m-%d") - weekday = WEEKDAY_NAMES[yesterday_start.weekday()] + # 边缘设备名称映射 + edge_name_map = _get_edge_name_map() + # ==================== 组装 Markdown ==================== lines = [ f"**AI安防日报 — {date_str}({weekday})**", "", @@ -145,6 +172,46 @@ async def generate_daily_report() -> Optional[str]: display_name = name_map.get(device_id, device_id) lines.append(f">{i}. {display_name} — {count}条") + # ---- 按边缘节点分组汇总 ---- + if len(edge_groups) > 1 or (len(edge_groups) == 1 and "unknown" not in edge_groups): + lines.append("") + lines.append("**各边缘节点汇总**") + + # 按告警数降序排列 + sorted_edges = sorted(edge_groups.items(), key=lambda x: len(x[1]), reverse=True) + + for edge_key, alarms in sorted_edges: + edge_name = edge_name_map.get(edge_key, edge_key) + if edge_key == "unknown": + edge_name = "未知节点" + + edge_total = len(alarms) + edge_type_counter = Counter(a.alarm_type for a in alarms) + edge_unhandled = sum(1 for a in alarms if a.handle_status not in ("DONE", "IGNORED")) + edge_false = sum(1 for a in alarms if a.alarm_status == "FALSE") + + # 类型分布简写 + type_parts = [] + for at, ac in edge_type_counter.most_common(): + type_parts.append(f"{ALARM_TYPE_NAMES.get(at, at)}{ac}条") + type_summary = "、".join(type_parts) + + # 该节点的 Top3 摄像头 + edge_dev_counter = Counter(a.device_id for a in alarms) + top3 = edge_dev_counter.most_common(3) + cam_parts = [] + for dev_id, dev_count in top3: + cam_name = name_map.get(dev_id, dev_id) + cam_parts.append(f"{cam_name}({dev_count})") + cam_summary = "、".join(cam_parts) + + lines.append(f"") + lines.append(f">{edge_name}:{edge_total} 条") + lines.append(f"> 类型:{type_summary}") + if edge_unhandled > 0: + lines.append(f"> 待处理:{edge_unhandled} 条 误报:{edge_false}条") + lines.append(f"> Top设备:{cam_summary}") + return "\n".join(lines)