diff --git a/app/config.py b/app/config.py index 795b479..f0d0c92 100644 --- a/app/config.py +++ b/app/config.py @@ -107,6 +107,14 @@ class EdgeAuthConfig: enabled: bool = False # 是否启用 token 校验 +@dataclass +class DailyReportConfig: + """日报配置""" + enabled: bool = False + send_hour: int = 9 + send_minute: int = 0 + + @dataclass class CameraNameConfig: """摄像头名称配置""" @@ -125,6 +133,7 @@ class Settings(BaseModel): agent: AgentConfig = AgentConfig() work_order: WorkOrderConfig = WorkOrderConfig() redis: RedisConfig = RedisConfig() + daily_report: DailyReportConfig = DailyReportConfig() camera_name: CameraNameConfig = CameraNameConfig() edge_auth: EdgeAuthConfig = EdgeAuthConfig() @@ -203,6 +212,11 @@ def load_settings() -> Settings: wvp_api_base=os.getenv("WVP_API_BASE", "http://localhost:18080"), query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "15")), ), + daily_report=DailyReportConfig( + enabled=os.getenv("DAILY_REPORT_ENABLED", "false").lower() == "true", + send_hour=int(os.getenv("DAILY_REPORT_HOUR", "9")), + send_minute=int(os.getenv("DAILY_REPORT_MINUTE", "0")), + ), edge_auth=EdgeAuthConfig( token=os.getenv("EDGE_AUTH_TOKEN", ""), enabled=os.getenv("EDGE_AUTH_ENABLED", "false").lower() == "true", diff --git a/app/main.py b/app/main.py index 5d7ba42..5a05d06 100644 --- a/app/main.py +++ b/app/main.py @@ -72,11 +72,23 @@ async def lifespan(app: FastAPI): wo_client = get_work_order_client() wo_client.init(settings.work_order) + # 启动日报定时任务 + report_task = None + if settings.daily_report.enabled and settings.wechat.group_chat_id: + from app.services.daily_report_service import start_daily_report_scheduler + report_task = asyncio.create_task(start_daily_report_scheduler()) + logger.info("AI 告警平台启动完成") yield # 关闭 + if report_task: + report_task.cancel() + try: + await report_task + except asyncio.CancelledError: + pass logger.info("AI 告警平台已关闭") diff --git a/app/services/daily_report_service.py b/app/services/daily_report_service.py new file mode 100644 index 0000000..50654a2 --- /dev/null +++ b/app/services/daily_report_service.py @@ -0,0 +1,202 @@ +""" +每日告警日报定时推送服务 + +每天定时生成前一天的告警汇总,发送到企微群聊。 +""" +import asyncio +from datetime import datetime, timedelta +from collections import Counter +from typing import Dict, List, Optional, Tuple + +from app.utils.logger import logger +from app.config import settings + +# 告警类型中文映射(与 wechat_service 保持一致) +ALARM_TYPE_NAMES = { + "leave_post": "人员离岗", + "intrusion": "周界入侵", + "illegal_parking": "车辆违停", + "vehicle_congestion": "车辆拥堵", +} + +WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] + + +async def generate_daily_report() -> Optional[str]: + """生成昨日告警日报 Markdown 内容""" + from app.services.alarm_event_service import get_alarm_event_service + from app.services.camera_name_service import get_camera_name_service + from app.utils.timezone import beijing_now + + svc = get_alarm_event_service() + camera_svc = get_camera_name_service() + + now = beijing_now() + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + yesterday_start = today_start - timedelta(days=1) + day_before_start = today_start - timedelta(days=2) + + # 查询昨日和前日全量告警(page_size 设大,拿全量) + yesterday_alarms, yesterday_total = svc.get_alarms( + start_time=yesterday_start, end_time=today_start, page=1, page_size=10000 + ) + _, prev_total = svc.get_alarms( + start_time=day_before_start, end_time=yesterday_start, page=1, page_size=1 + ) + + # 无告警时发送简短通知 + if yesterday_total == 0: + date_str = yesterday_start.strftime("%m-%d") + weekday = WEEKDAY_NAMES[yesterday_start.weekday()] + return ( + f"**AI安防日报 — {date_str}({weekday})**\n\n" + f">昨日告警总计:**0** 条\n" + f">系统运行正常,无异常事件" + ) + + # ---- 统计 ---- + type_counter: Counter = Counter() + device_counter: Counter = Counter() + handle_done = 0 + handle_ignored = 0 + handle_unhandled = 0 + false_alarm = 0 + response_times: List[float] = [] + + for a in yesterday_alarms: + type_counter[a.alarm_type] += 1 + device_counter[a.device_id] += 1 + + # 处理状态统计 + if a.handle_status == "DONE": + handle_done += 1 + elif a.handle_status == "IGNORED": + handle_ignored += 1 + else: + handle_unhandled += 1 + + # 误报统计 + if a.alarm_status == "FALSE": + false_alarm += 1 + + # 响应时长(从 event_time 到 handled_at) + if a.handled_at and a.event_time: + delta = (a.handled_at - a.event_time).total_seconds() / 60.0 + if delta >= 0: + response_times.append(delta) + + # 环比变化 + if prev_total > 0: + change_pct = (yesterday_total - prev_total) / prev_total * 100 + if change_pct > 0: + change_str = f"前日{prev_total}条,↑{change_pct:.1f}%" + elif change_pct < 0: + change_str = f"前日{prev_total}条,↓{abs(change_pct):.1f}%" + else: + change_str = f"前日{prev_total}条,持平" + else: + change_str = "前日无告警" + + # 平均响应时长 + if response_times: + avg_resp = sum(response_times) / len(response_times) + if avg_resp < 60: + resp_str = f"{avg_resp:.1f}分钟" + else: + resp_str = f"{avg_resp / 60:.1f}小时" + else: + resp_str = "暂无数据" + + # 设备 Top5 — 批量获取摄像头名称 + top5_devices = device_counter.most_common(5) + device_ids = [d[0] for d in top5_devices] + try: + name_map = await camera_svc.get_display_names_batch(device_ids) + except Exception as e: + logger.warning(f"日报获取摄像头名称失败: {e}") + name_map = {} + + # ---- 组装 Markdown ---- + date_str = yesterday_start.strftime("%m-%d") + weekday = WEEKDAY_NAMES[yesterday_start.weekday()] + + lines = [ + f"**AI安防日报 — {date_str}({weekday})**", + "", + f">昨日告警总计:{yesterday_total} 条({change_str})", + f">待处理:{handle_unhandled} 条", + f">已处理:{handle_done}条 | 已忽略:{handle_ignored}条 | 误报:{false_alarm}条", + f">平均响应:{resp_str}", + ] + + # 按类型分布 + if type_counter: + lines.append("") + lines.append("**按类型分布**") + for alarm_type, count in type_counter.most_common(): + type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) + lines.append(f">{type_name}:{count}条") + + # 设备 Top5 + if top5_devices: + lines.append("") + lines.append("**告警设备 Top5**") + for i, (device_id, count) in enumerate(top5_devices, 1): + display_name = name_map.get(device_id, device_id) + lines.append(f">{i}. {display_name} — {count}条") + + return "\n".join(lines) + + +async def _send_daily_report(): + """生成并发送日报""" + from app.services.wechat_service import get_wechat_service + + chat_id = settings.wechat.group_chat_id + if not chat_id: + logger.warning("日报发送跳过:未配置 group_chat_id") + return + + try: + content = await generate_daily_report() + if not content: + logger.info("日报生成内容为空,跳过发送") + return + + wechat_svc = get_wechat_service() + ok = await wechat_svc.send_group_markdown(chat_id, content) + if ok: + logger.info("日报已发送到企微群聊") + else: + logger.error("日报发送失败") + except Exception: + logger.exception("日报生成或发送异常") + + +def _seconds_until(hour: int, minute: int) -> float: + """计算距离下一个 HH:MM 的秒数""" + from app.utils.timezone import beijing_now + + now = beijing_now() + target = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if target <= now: + target += timedelta(days=1) + return (target - now).total_seconds() + + +async def start_daily_report_scheduler(): + """日报定时调度主循环""" + hour = settings.daily_report.send_hour + minute = settings.daily_report.send_minute + logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送") + + try: + while True: + wait = _seconds_until(hour, minute) + logger.debug(f"日报下次发送倒计时 {wait:.0f} 秒") + await asyncio.sleep(wait) + await _send_daily_report() + # 发送完等 61 秒,避免同一分钟内重复触发 + await asyncio.sleep(61) + except asyncio.CancelledError: + logger.info("日报定时任务已停止")