Files
iot-device-management-service/app/services/daily_report_service.py

270 lines
9.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
每日告警日报定时推送服务
每天定时生成前一天的告警汇总,发送到企微群聊。
支持按边缘节点分组统计。
"""
import asyncio
from collections import Counter, defaultdict
from datetime import timedelta
from typing import Dict, List, Optional
from app.utils.logger import logger
from app.config import settings
# 告警类型中文映射(与 wechat_service 保持一致)
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
"illegal_parking": "车辆违停",
"vehicle_congestion": "车辆拥堵",
}
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
def _get_edge_name_map() -> Dict[str, str]:
"""从 EdgeDevice 表获取 edge_node_id → device_name 映射"""
from app.models import get_session, EdgeDevice
db = get_session()
try:
devices = db.query(EdgeDevice.device_id, EdgeDevice.device_name).all()
return {d.device_id: (d.device_name or d.device_id) for d in devices}
except Exception as e:
logger.warning(f"查询边缘设备名称失败: {e}")
return {}
finally:
db.close()
def _format_resp_time(minutes: float) -> str:
"""格式化响应时长"""
if minutes < 60:
return f"{minutes:.1f}分钟"
return f"{minutes / 60:.1f}小时"
async def generate_daily_report() -> Optional[str]:
"""生成昨日告警日报 Markdown 内容(含按边缘节点分组)"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.camera_name_service import get_camera_name_service
from app.utils.timezone import beijing_now
svc = get_alarm_event_service()
camera_svc = get_camera_name_service()
now = beijing_now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
day_before_start = today_start - timedelta(days=2)
# 查询昨日和前日全量告警
yesterday_alarms, yesterday_total = svc.get_alarms(
start_time=yesterday_start, end_time=today_start, page=1, page_size=10000
)
_, prev_total = svc.get_alarms(
start_time=day_before_start, end_time=yesterday_start, page=1, page_size=1
)
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
# 无告警时发送简短通知
if yesterday_total == 0:
return (
f"**AI安防日报 — {date_str}{weekday}**\n\n"
f">昨日告警总计:**0** 条\n"
f">系统运行正常,无异常事件"
)
# ---- 全局统计 ----
type_counter: Counter = Counter()
device_counter: Counter = Counter()
handle_done = 0
handle_ignored = 0
handle_unhandled = 0
false_alarm = 0
response_times: List[float] = []
# ---- 按边缘节点分组 ----
edge_groups: Dict[str, List] = defaultdict(list)
for a in yesterday_alarms:
type_counter[a.alarm_type] += 1
device_counter[a.device_id] += 1
# 按 edge_node_id 分组(无 edge_node_id 归入 "unknown"
edge_key = a.edge_node_id or "unknown"
edge_groups[edge_key].append(a)
if a.handle_status == "DONE":
handle_done += 1
elif a.handle_status == "IGNORED":
handle_ignored += 1
else:
handle_unhandled += 1
if a.alarm_status == "FALSE":
false_alarm += 1
if a.handled_at and a.event_time:
delta = (a.handled_at - a.event_time).total_seconds() / 60.0
if 0 <= delta <= 360: # 排除 >6h 异常值
response_times.append(delta)
# 环比变化
if prev_total > 0:
change_pct = (yesterday_total - prev_total) / prev_total * 100
if change_pct > 0:
change_str = f"前日{prev_total}条,↑{change_pct:.1f}%"
elif change_pct < 0:
change_str = f"前日{prev_total}条,↓{abs(change_pct):.1f}%"
else:
change_str = f"前日{prev_total}条,持平"
else:
change_str = "前日无告警"
# 平均响应时长
resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据"
# 设备 Top5 — 批量获取摄像头名称
top5_devices = device_counter.most_common(5)
all_device_ids = list({d[0] for d in top5_devices})
# 也收集各边缘节点的 top 设备 ID
for edge_key, alarms in edge_groups.items():
edge_dev_counter = Counter(a.device_id for a in alarms)
for dev_id, _ in edge_dev_counter.most_common(3):
if dev_id not in all_device_ids:
all_device_ids.append(dev_id)
try:
name_map = await camera_svc.get_display_names_batch(all_device_ids)
except Exception as e:
logger.warning(f"日报获取摄像头名称失败: {e}")
name_map = {}
# 边缘设备名称映射
edge_name_map = _get_edge_name_map()
# ==================== 组装 Markdown ====================
lines = [
f"**AI安防日报 — {date_str}{weekday}**",
"",
f">昨日告警总计:<font color=\"warning\">{yesterday_total}</font> 条({change_str}",
f">待处理:<font color=\"warning\">{handle_unhandled}</font> 条",
f">已处理:{handle_done}条 | 已忽略:{handle_ignored}条 | 误报:{false_alarm}",
f">平均响应:<font color=\"info\">{resp_str}</font>",
]
# 按类型分布
if type_counter:
lines.append("")
lines.append("**按类型分布**")
for alarm_type, count in type_counter.most_common():
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
lines.append(f">{type_name}{count}")
# 设备 Top5
if top5_devices:
lines.append("")
lines.append("**告警设备 Top5**")
for i, (device_id, count) in enumerate(top5_devices, 1):
display_name = name_map.get(device_id, device_id)
lines.append(f">{i}. {display_name}{count}")
# ---- 按边缘节点分组汇总 ----
if len(edge_groups) > 1 or (len(edge_groups) == 1 and "unknown" not in edge_groups):
lines.append("")
lines.append("**各边缘节点汇总**")
# 按告警数降序排列
sorted_edges = sorted(edge_groups.items(), key=lambda x: len(x[1]), reverse=True)
for edge_key, alarms in sorted_edges:
edge_name = edge_name_map.get(edge_key, edge_key)
if edge_key == "unknown":
edge_name = "未知节点"
edge_total = len(alarms)
edge_type_counter = Counter(a.alarm_type for a in alarms)
edge_unhandled = sum(1 for a in alarms if a.handle_status not in ("DONE", "IGNORED"))
edge_false = sum(1 for a in alarms if a.alarm_status == "FALSE")
# 类型分布简写
type_parts = []
for at, ac in edge_type_counter.most_common():
type_parts.append(f"{ALARM_TYPE_NAMES.get(at, at)}{ac}")
type_summary = "".join(type_parts)
# 该节点的 Top3 摄像头
edge_dev_counter = Counter(a.device_id for a in alarms)
top3 = edge_dev_counter.most_common(3)
cam_parts = []
for dev_id, dev_count in top3:
cam_name = name_map.get(dev_id, dev_id)
cam_parts.append(f"{cam_name}({dev_count})")
cam_summary = "".join(cam_parts)
lines.append(f"")
lines.append(f">{edge_name}<font color=\"warning\">{edge_total}</font> 条")
lines.append(f"> 类型:{type_summary}")
if edge_unhandled > 0:
lines.append(f"> 待处理:<font color=\"warning\">{edge_unhandled}</font> 条 误报:{edge_false}")
lines.append(f"> Top设备{cam_summary}")
return "\n".join(lines)
async def _send_daily_report():
"""生成并发送日报"""
from app.services.wechat_service import get_wechat_service
chat_id = settings.wechat.group_chat_id
if not chat_id:
logger.warning("日报发送跳过:未配置 group_chat_id")
return
try:
content = await generate_daily_report()
if not content:
logger.info("日报生成内容为空,跳过发送")
return
wechat_svc = get_wechat_service()
ok = await wechat_svc.send_group_markdown(chat_id, content)
if ok:
logger.info("日报已发送到企微群聊")
else:
logger.error("日报发送失败")
except Exception:
logger.exception("日报生成或发送异常")
def _seconds_until(hour: int, minute: int) -> float:
"""计算距离下一个 HH:MM 的秒数"""
from app.utils.timezone import beijing_now
now = beijing_now()
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
return (target - now).total_seconds()
async def start_daily_report_scheduler():
"""日报定时调度主循环"""
hour = settings.daily_report.send_hour
minute = settings.daily_report.send_minute
logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送")
try:
while True:
wait = _seconds_until(hour, minute)
logger.debug(f"日报下次发送倒计时 {wait:.0f}")
await asyncio.sleep(wait)
await _send_daily_report()
# 发送完等 61 秒,避免同一分钟内重复触发
await asyncio.sleep(61)
except asyncio.CancelledError:
logger.info("日报定时任务已停止")