Files
iot-device-management-service/app/services/report_generator.py
16337 10b25742f8 fix: 剩余服务文件时间统一为北京时间
- agent_dispatcher、alert_service、report_generator、
  oss_storage、work_order_service 全部使用 beijing_now()
- 全局无遗留 UTC 时间调用

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 16:29:28 +08:00

167 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
告警报表生成器
生成 Excel 格式的告警汇总报表,包含告警明细和统计汇总两个 Sheet。
"""
import io
from datetime import timedelta
from typing import Optional, Tuple
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from app.models import AlarmEvent, get_session
from app.utils.logger import logger
from app.utils.timezone import beijing_now
TYPE_NAMES = {"leave_post": "人员离岗", "intrusion": "周界入侵"}
LEVEL_NAMES = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"}
STATUS_NAMES = {
"NEW": "待处理", "CONFIRMED": "已确认",
"FALSE": "误报", "CLOSED": "已关闭",
}
def generate_alarm_report(
time_range: str = "week",
) -> Optional[Tuple[str, io.BytesIO]]:
"""
生成告警汇总Excel
Args:
time_range: today/week/month
Returns:
(filename, bytes_io) 或 None无数据时
"""
now = beijing_now()
if time_range == "today":
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
label = now.strftime("%Y%m%d")
elif time_range == "week":
start = now - timedelta(days=now.weekday())
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
label = f"{start.strftime('%Y%m%d')}-{now.strftime('%Y%m%d')}"
elif time_range == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
label = now.strftime("%Y%m")
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
label = now.strftime("%Y%m%d")
db = get_session()
try:
alarms = (
db.query(AlarmEvent)
.filter(AlarmEvent.event_time >= start, AlarmEvent.event_time <= now)
.order_by(AlarmEvent.event_time.desc())
.all()
)
if not alarms:
return None
wb = Workbook()
# ===== Sheet 1: 告警明细 =====
ws = wb.active
ws.title = "告警明细"
headers = [
"告警ID", "告警类型", "设备ID", "场景ID", "告警级别",
"告警状态", "处理状态", "置信度", "事件时间", "处理人", "备注",
]
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font = Font(color="FFFFFF", bold=True, size=11)
thin_border = Border(
left=Side(style="thin"), right=Side(style="thin"),
top=Side(style="thin"), bottom=Side(style="thin"),
)
for col, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=h)
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center")
cell.border = thin_border
for row, a in enumerate(alarms, 2):
values = [
a.alarm_id,
TYPE_NAMES.get(a.alarm_type, a.alarm_type),
a.device_id,
a.scene_id or "",
LEVEL_NAMES.get(a.alarm_level, str(a.alarm_level or "")),
STATUS_NAMES.get(a.alarm_status, a.alarm_status or ""),
a.handle_status or "",
f"{a.confidence_score:.2f}" if a.confidence_score else "",
a.event_time.strftime("%Y-%m-%d %H:%M:%S") if a.event_time else "",
a.handler or "",
a.handle_remark or "",
]
for col, v in enumerate(values, 1):
cell = ws.cell(row=row, column=col, value=v)
cell.border = thin_border
# 自动列宽
for col_cells in ws.columns:
max_len = max(len(str(cell.value or "")) for cell in col_cells)
ws.column_dimensions[col_cells[0].column_letter].width = min(max_len + 4, 30)
# ===== Sheet 2: 统计汇总 =====
ws2 = wb.create_sheet("统计汇总")
type_count = {}
level_count = {}
status_count = {}
for a in alarms:
type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1
level_count[a.alarm_level] = level_count.get(a.alarm_level, 0) + 1
status_count[a.alarm_status] = status_count.get(a.alarm_status, 0) + 1
title_font = Font(bold=True, size=12)
# 类型统计
ws2.cell(row=1, column=1, value="告警类型统计").font = title_font
ws2.cell(row=2, column=1, value="类型")
ws2.cell(row=2, column=2, value="数量")
for i, (t, c) in enumerate(type_count.items(), 3):
ws2.cell(row=i, column=1, value=TYPE_NAMES.get(t, t))
ws2.cell(row=i, column=2, value=c)
# 状态统计
offset = len(type_count) + 4
ws2.cell(row=offset, column=1, value="告警状态统计").font = title_font
ws2.cell(row=offset + 1, column=1, value="状态")
ws2.cell(row=offset + 1, column=2, value="数量")
for i, (s, c) in enumerate(status_count.items(), offset + 2):
ws2.cell(row=i, column=1, value=STATUS_NAMES.get(s, s))
ws2.cell(row=i, column=2, value=c)
# 级别统计
offset2 = offset + len(status_count) + 3
ws2.cell(row=offset2, column=1, value="告警级别统计").font = title_font
ws2.cell(row=offset2 + 1, column=1, value="级别")
ws2.cell(row=offset2 + 1, column=2, value="数量")
for i, (lv, c) in enumerate(level_count.items(), offset2 + 2):
ws2.cell(row=i, column=1, value=LEVEL_NAMES.get(lv, str(lv)))
ws2.cell(row=i, column=2, value=c)
# 输出到内存
output = io.BytesIO()
wb.save(output)
output.seek(0)
filename = f"告警报表_{label}.xlsx"
logger.info(f"报表已生成: {filename}, 告警数={len(alarms)}")
return (filename, output)
except Exception as e:
logger.error(f"生成报表失败: {e}", exc_info=True)
return None
finally:
db.close()