Files
iot-device-management-service/app/services/report_generator.py
16337 dd86da5bcd 功能:告警级别体系统一为 0紧急/1重要/2普通/3轻微
更新 API 接口、Schema 验证、报告生成、企微通知的级别映射,
与前端和边缘端保持一致。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 16:39:16 +08:00

167 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
告警报表生成器
生成 Excel 格式的告警汇总报表,包含告警明细和统计汇总两个 Sheet。
"""
import io
from datetime import timedelta
from typing import Optional, Tuple
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from app.models import AlarmEvent, get_session
from app.utils.logger import logger
from app.utils.timezone import beijing_now
TYPE_NAMES = {"leave_post": "人员离岗", "intrusion": "周界入侵"}
LEVEL_NAMES = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"}
STATUS_NAMES = {
"NEW": "待处理", "CONFIRMED": "已确认",
"FALSE": "误报", "CLOSED": "已关闭",
}
def generate_alarm_report(
time_range: str = "week",
) -> Optional[Tuple[str, io.BytesIO]]:
"""
生成告警汇总Excel
Args:
time_range: today/week/month
Returns:
(filename, bytes_io) 或 None无数据时
"""
now = beijing_now()
if time_range == "today":
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
label = now.strftime("%Y%m%d")
elif time_range == "week":
start = now - timedelta(days=now.weekday())
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
label = f"{start.strftime('%Y%m%d')}-{now.strftime('%Y%m%d')}"
elif time_range == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
label = now.strftime("%Y%m")
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
label = now.strftime("%Y%m%d")
db = get_session()
try:
alarms = (
db.query(AlarmEvent)
.filter(AlarmEvent.event_time >= start, AlarmEvent.event_time <= now)
.order_by(AlarmEvent.event_time.desc())
.all()
)
if not alarms:
return None
wb = Workbook()
# ===== Sheet 1: 告警明细 =====
ws = wb.active
ws.title = "告警明细"
headers = [
"告警ID", "告警类型", "设备ID", "场景ID", "告警级别",
"告警状态", "处理状态", "置信度", "事件时间", "处理人", "备注",
]
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font = Font(color="FFFFFF", bold=True, size=11)
thin_border = Border(
left=Side(style="thin"), right=Side(style="thin"),
top=Side(style="thin"), bottom=Side(style="thin"),
)
for col, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=h)
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center")
cell.border = thin_border
for row, a in enumerate(alarms, 2):
values = [
a.alarm_id,
TYPE_NAMES.get(a.alarm_type, a.alarm_type),
a.device_id,
a.scene_id or "",
LEVEL_NAMES.get(a.alarm_level, str(a.alarm_level or "")),
STATUS_NAMES.get(a.alarm_status, a.alarm_status or ""),
a.handle_status or "",
f"{a.confidence_score:.2f}" if a.confidence_score else "",
a.event_time.strftime("%Y-%m-%d %H:%M:%S") if a.event_time else "",
a.handler or "",
a.handle_remark or "",
]
for col, v in enumerate(values, 1):
cell = ws.cell(row=row, column=col, value=v)
cell.border = thin_border
# 自动列宽
for col_cells in ws.columns:
max_len = max(len(str(cell.value or "")) for cell in col_cells)
ws.column_dimensions[col_cells[0].column_letter].width = min(max_len + 4, 30)
# ===== Sheet 2: 统计汇总 =====
ws2 = wb.create_sheet("统计汇总")
type_count = {}
level_count = {}
status_count = {}
for a in alarms:
type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1
level_count[a.alarm_level] = level_count.get(a.alarm_level, 0) + 1
status_count[a.alarm_status] = status_count.get(a.alarm_status, 0) + 1
title_font = Font(bold=True, size=12)
# 类型统计
ws2.cell(row=1, column=1, value="告警类型统计").font = title_font
ws2.cell(row=2, column=1, value="类型")
ws2.cell(row=2, column=2, value="数量")
for i, (t, c) in enumerate(type_count.items(), 3):
ws2.cell(row=i, column=1, value=TYPE_NAMES.get(t, t))
ws2.cell(row=i, column=2, value=c)
# 状态统计
offset = len(type_count) + 4
ws2.cell(row=offset, column=1, value="告警状态统计").font = title_font
ws2.cell(row=offset + 1, column=1, value="状态")
ws2.cell(row=offset + 1, column=2, value="数量")
for i, (s, c) in enumerate(status_count.items(), offset + 2):
ws2.cell(row=i, column=1, value=STATUS_NAMES.get(s, s))
ws2.cell(row=i, column=2, value=c)
# 级别统计
offset2 = offset + len(status_count) + 3
ws2.cell(row=offset2, column=1, value="告警级别统计").font = title_font
ws2.cell(row=offset2 + 1, column=1, value="级别")
ws2.cell(row=offset2 + 1, column=2, value="数量")
for i, (lv, c) in enumerate(level_count.items(), offset2 + 2):
ws2.cell(row=i, column=1, value=LEVEL_NAMES.get(lv, str(lv)))
ws2.cell(row=i, column=2, value=c)
# 输出到内存
output = io.BytesIO()
wb.save(output)
output.seek(0)
filename = f"告警报表_{label}.xlsx"
logger.info(f"报表已生成: {filename}, 告警数={len(alarms)}")
return (filename, output)
except Exception as e:
logger.error(f"生成报表失败: {e}", exc_info=True)
return None
finally:
db.close()