Files
iot-device-management-service/app/services/daily_report_service.py
16337 2a36bd4ffc 功能:每日告警日报定时推送到企微群聊
- 新增 DailyReportConfig 配置(DAILY_REPORT_ENABLED/HOUR/MINUTE)
- 新增 daily_report_service:asyncio 定时调度,每日生成前一天告警汇总
- 日报内容:告警总数、环比变化、处理状态分布、类型分布、设备 Top5、平均响应时长
- lifespan 中启动/停止定时任务
2026-03-24 17:43:52 +08:00

203 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
每日告警日报定时推送服务
每天定时生成前一天的告警汇总,发送到企微群聊。
"""
import asyncio
from datetime import datetime, timedelta
from collections import Counter
from typing import Dict, List, Optional, Tuple
from app.utils.logger import logger
from app.config import settings
# 告警类型中文映射(与 wechat_service 保持一致)
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
"illegal_parking": "车辆违停",
"vehicle_congestion": "车辆拥堵",
}
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
async def generate_daily_report() -> Optional[str]:
"""生成昨日告警日报 Markdown 内容"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.camera_name_service import get_camera_name_service
from app.utils.timezone import beijing_now
svc = get_alarm_event_service()
camera_svc = get_camera_name_service()
now = beijing_now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
day_before_start = today_start - timedelta(days=2)
# 查询昨日和前日全量告警page_size 设大,拿全量)
yesterday_alarms, yesterday_total = svc.get_alarms(
start_time=yesterday_start, end_time=today_start, page=1, page_size=10000
)
_, prev_total = svc.get_alarms(
start_time=day_before_start, end_time=yesterday_start, page=1, page_size=1
)
# 无告警时发送简短通知
if yesterday_total == 0:
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
return (
f"**AI安防日报 — {date_str}{weekday}**\n\n"
f">昨日告警总计:**0** 条\n"
f">系统运行正常,无异常事件"
)
# ---- 统计 ----
type_counter: Counter = Counter()
device_counter: Counter = Counter()
handle_done = 0
handle_ignored = 0
handle_unhandled = 0
false_alarm = 0
response_times: List[float] = []
for a in yesterday_alarms:
type_counter[a.alarm_type] += 1
device_counter[a.device_id] += 1
# 处理状态统计
if a.handle_status == "DONE":
handle_done += 1
elif a.handle_status == "IGNORED":
handle_ignored += 1
else:
handle_unhandled += 1
# 误报统计
if a.alarm_status == "FALSE":
false_alarm += 1
# 响应时长(从 event_time 到 handled_at
if a.handled_at and a.event_time:
delta = (a.handled_at - a.event_time).total_seconds() / 60.0
if delta >= 0:
response_times.append(delta)
# 环比变化
if prev_total > 0:
change_pct = (yesterday_total - prev_total) / prev_total * 100
if change_pct > 0:
change_str = f"前日{prev_total}条,↑{change_pct:.1f}%"
elif change_pct < 0:
change_str = f"前日{prev_total}条,↓{abs(change_pct):.1f}%"
else:
change_str = f"前日{prev_total}条,持平"
else:
change_str = "前日无告警"
# 平均响应时长
if response_times:
avg_resp = sum(response_times) / len(response_times)
if avg_resp < 60:
resp_str = f"{avg_resp:.1f}分钟"
else:
resp_str = f"{avg_resp / 60:.1f}小时"
else:
resp_str = "暂无数据"
# 设备 Top5 — 批量获取摄像头名称
top5_devices = device_counter.most_common(5)
device_ids = [d[0] for d in top5_devices]
try:
name_map = await camera_svc.get_display_names_batch(device_ids)
except Exception as e:
logger.warning(f"日报获取摄像头名称失败: {e}")
name_map = {}
# ---- 组装 Markdown ----
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
lines = [
f"**AI安防日报 — {date_str}{weekday}**",
"",
f">昨日告警总计:<font color=\"warning\">{yesterday_total}</font> 条({change_str}",
f">待处理:<font color=\"warning\">{handle_unhandled}</font> 条",
f">已处理:{handle_done}条 | 已忽略:{handle_ignored}条 | 误报:{false_alarm}",
f">平均响应:<font color=\"info\">{resp_str}</font>",
]
# 按类型分布
if type_counter:
lines.append("")
lines.append("**按类型分布**")
for alarm_type, count in type_counter.most_common():
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
lines.append(f">{type_name}{count}")
# 设备 Top5
if top5_devices:
lines.append("")
lines.append("**告警设备 Top5**")
for i, (device_id, count) in enumerate(top5_devices, 1):
display_name = name_map.get(device_id, device_id)
lines.append(f">{i}. {display_name}{count}")
return "\n".join(lines)
async def _send_daily_report():
"""生成并发送日报"""
from app.services.wechat_service import get_wechat_service
chat_id = settings.wechat.group_chat_id
if not chat_id:
logger.warning("日报发送跳过:未配置 group_chat_id")
return
try:
content = await generate_daily_report()
if not content:
logger.info("日报生成内容为空,跳过发送")
return
wechat_svc = get_wechat_service()
ok = await wechat_svc.send_group_markdown(chat_id, content)
if ok:
logger.info("日报已发送到企微群聊")
else:
logger.error("日报发送失败")
except Exception:
logger.exception("日报生成或发送异常")
def _seconds_until(hour: int, minute: int) -> float:
"""计算距离下一个 HH:MM 的秒数"""
from app.utils.timezone import beijing_now
now = beijing_now()
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
return (target - now).total_seconds()
async def start_daily_report_scheduler():
"""日报定时调度主循环"""
hour = settings.daily_report.send_hour
minute = settings.daily_report.send_minute
logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送")
try:
while True:
wait = _seconds_until(hour, minute)
logger.debug(f"日报下次发送倒计时 {wait:.0f}")
await asyncio.sleep(wait)
await _send_daily_report()
# 发送完等 61 秒,避免同一分钟内重复触发
await asyncio.sleep(61)
except asyncio.CancelledError:
logger.info("日报定时任务已停止")