优化:日报升级为图文摘要加详情推送

This commit is contained in:
2026-04-03 13:16:04 +08:00
parent bfcd3b9a35
commit 30db9d8961

View File

@@ -4,15 +4,15 @@
每天定时生成前一天的工单汇总,发送到企微群聊。
数据源IoT ops_order + 安保/保洁扩展表。
"""
import asyncio
from collections import Counter, defaultdict
from collections import Counter
from datetime import timedelta
from typing import Dict, List, Optional
from app.utils.logger import logger
from app.config import settings
from app.utils.logger import logger
# 告警类型中文映射
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
@@ -20,30 +20,65 @@ ALARM_TYPE_NAMES = {
"vehicle_congestion": "车辆拥堵",
}
# 保洁类型映射
CLEANING_TYPE_NAMES = {
"ROUTINE": "日常保洁", "DEEP": "深度保洁",
"SPOT": "点状保洁", "EMERGENCY": "应急保洁",
}
# 工单状态映射
ORDER_STATUS_NAMES = {
"PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗",
"PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消",
"ROUTINE": "日常保洁",
"DEEP": "深度保洁",
"SPOT": "点状保洁",
"EMERGENCY": "应急保洁",
}
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
REPORT_COVER_IMAGE_URL = "https://wework.qpic.cn/wwpic/252813_jOfDHtcISzuodLa_1629280209/0"
def _format_resp_time(minutes: float) -> str:
"""格式化响应时长"""
def _format_duration(minutes: float) -> str:
if minutes <= 0:
return "暂无数据"
if minutes < 60:
return f"{minutes:.1f}分钟"
return f"{minutes / 60:.1f}小时"
async def generate_daily_report() -> Optional[str]:
"""生成昨日工单日报 Markdown 内容"""
def _format_ratio(numerator: int, denominator: int) -> str:
if denominator <= 0:
return "0%"
return f"{(numerator / denominator) * 100:.1f}%"
def _format_change(current: int, previous: int) -> str:
if previous <= 0:
return "前日无工单"
change_pct = (current - previous) / previous * 100
if change_pct > 0:
return f"前日{previous}条,↑{change_pct:.1f}%"
if change_pct < 0:
return f"前日{previous}条,↓{abs(change_pct):.1f}%"
return f"前日{previous}条,持平"
def _top_summary(counter: Counter, mapping: Optional[Dict[str, str]] = None, top_n: int = 3) -> str:
if not counter:
return "暂无数据"
parts = []
for key, count in counter.most_common(top_n):
name = mapping.get(key, key) if mapping else key
parts.append(f"{name} {count}")
return "".join(parts)
def _location_name(order) -> str:
if getattr(order, "location", None):
return order.location
if getattr(order, "area_id", None):
return f"区域{order.area_id}"
return "未标注区域"
def _safe_avg(values: List[float]) -> float:
return sum(values) / len(values) if values else 0.0
async def _build_daily_report_data() -> Optional[Dict]:
from app.utils.timezone import beijing_now
now = beijing_now()
@@ -54,11 +89,12 @@ async def generate_daily_report() -> Optional[str]:
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
# 查询 IoT 工单
try:
from app.models_iot import (
get_iot_session, IotOpsOrder,
IotOpsOrderSecurityExt, IotOpsOrderCleanExt,
get_iot_session,
IotOpsOrder,
IotOpsOrderSecurityExt,
IotOpsOrderCleanExt,
)
except Exception as e:
logger.error(f"IoT数据库不可用日报生成失败: {e}")
@@ -66,7 +102,6 @@ async def generate_daily_report() -> Optional[str]:
db = get_iot_session()
try:
# 昨日工单
yesterday_orders = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= yesterday_start,
IotOpsOrder.create_time < today_start,
@@ -74,38 +109,58 @@ async def generate_daily_report() -> Optional[str]:
).all()
yesterday_total = len(yesterday_orders)
# 前日工单(用于环比)
prev_total = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= day_before_start,
IotOpsOrder.create_time < yesterday_start,
IotOpsOrder.deleted == 0,
).count()
if yesterday_total == 0:
return (
f"**物业工单日报 — {date_str}{weekday}**\n\n"
f">昨日工单总计:**0** 条\n"
f">系统运行正常,无工单"
)
current_open_orders = db.query(IotOpsOrder).filter(
IotOpsOrder.deleted == 0,
IotOpsOrder.status.notin_(("COMPLETED", "CANCELLED")),
).all()
if not yesterday_orders and not current_open_orders:
return {
"date_str": date_str,
"weekday": weekday,
"empty": True,
"title": f"物业工单日报 {date_str}{weekday}",
"subtitle": "昨日系统运行平稳",
"overview": "昨日无新增工单,当前无待处理工单。",
"summary": {
"yesterday_total": 0,
"completed_count": 0,
"backlog_count": 0,
"avg_resp": "暂无数据",
"avg_close": "暂无数据",
"false_alarm_rate": "0%",
},
"risk_lines": [
"安保高发:暂无数据",
"高发区域:暂无数据",
"高发摄像头:暂无数据",
"超时未处理0 条",
],
"change_str": "前日无工单",
"top_overdue": [],
}
# 收集 order_ids
order_ids = [o.id for o in yesterday_orders]
# 批量查安保扩展
sec_ext_map = {}
sec_exts = db.query(IotOpsOrderSecurityExt).filter(
IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
IotOpsOrderSecurityExt.deleted == 0,
).all()
sec_ext_map = {e.ops_order_id: e for e in sec_exts}
# 批量查保洁扩展
clean_ext_map = {}
clean_exts = db.query(IotOpsOrderCleanExt).filter(
IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
IotOpsOrderCleanExt.deleted == 0,
).all()
clean_ext_map = {e.ops_order_id: e for e in clean_exts}
if order_ids:
sec_exts = db.query(IotOpsOrderSecurityExt).filter(
IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
IotOpsOrderSecurityExt.deleted == 0,
).all()
sec_ext_map = {e.ops_order_id: e for e in sec_exts}
clean_exts = db.query(IotOpsOrderCleanExt).filter(
IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
IotOpsOrderCleanExt.deleted == 0,
).all()
clean_ext_map = {e.ops_order_id: e for e in clean_exts}
except Exception as e:
logger.error(f"查询IoT工单失败: {e}", exc_info=True)
@@ -113,47 +168,59 @@ async def generate_daily_report() -> Optional[str]:
finally:
db.close()
# ---- 统计 ----
type_count = {"SECURITY": 0, "CLEAN": 0}
status_count = Counter()
alarm_type_count = Counter()
camera_code_counter = Counter() # 先按 camera_code 统计
area_counter = Counter()
camera_code_counter = Counter()
false_alarm_count = 0
response_times: List[float] = []
close_times: List[float] = []
cleaning_type_count = Counter()
for o in yesterday_orders:
ot = o.order_type or "SECURITY"
type_count[ot] = type_count.get(ot, 0) + 1
status_count[o.status or "PENDING"] += 1
for order in yesterday_orders:
order_type = order.order_type or "SECURITY"
type_count[order_type] = type_count.get(order_type, 0) + 1
status_count[order.status or "PENDING"] += 1
area_counter[_location_name(order)] += 1
# 安保统计
sec_ext = sec_ext_map.get(o.id)
sec_ext = sec_ext_map.get(order.id)
if sec_ext:
if sec_ext.alarm_type:
alarm_type_count[sec_ext.alarm_type] += 1
# 统一用 camera_id即 camera_code做 key后续批量解析名称
cam_key = sec_ext.camera_id or sec_ext.camera_name
if cam_key:
camera_code_counter[cam_key] += 1
if sec_ext.false_alarm == 1:
false_alarm_count += 1
# 响应时长dispatched → confirmed
if sec_ext.dispatched_time and sec_ext.confirmed_time:
delta = (sec_ext.confirmed_time - sec_ext.dispatched_time).total_seconds() / 60.0
if 0 <= delta <= 360:
response_times.append(delta)
if sec_ext.dispatched_time and sec_ext.completed_time:
delta = (sec_ext.completed_time - sec_ext.dispatched_time).total_seconds() / 60.0
if 0 <= delta <= 24 * 60:
close_times.append(delta)
# 保洁统计
clean_ext = clean_ext_map.get(o.id)
if clean_ext and clean_ext.cleaning_type:
cleaning_type_count[clean_ext.cleaning_type] += 1
clean_ext = clean_ext_map.get(order.id)
if clean_ext:
if clean_ext.cleaning_type:
cleaning_type_count[clean_ext.cleaning_type] += 1
dispatch_time = clean_ext.first_dispatched_time or clean_ext.dispatched_time
if dispatch_time and clean_ext.arrived_time:
delta = (clean_ext.arrived_time - dispatch_time).total_seconds() / 60.0
if 0 <= delta <= 360:
response_times.append(delta)
if dispatch_time and clean_ext.completed_time:
delta = (clean_ext.completed_time - dispatch_time).total_seconds() / 60.0
if 0 <= delta <= 24 * 60:
close_times.append(delta)
# 批量解析摄像头名称camera_code → 真实名称)
camera_counter = Counter()
if camera_code_counter:
try:
from app.services.camera_name_service import get_camera_name_service
cam_svc = get_camera_name_service()
name_map = await cam_svc.get_display_names_batch(list(camera_code_counter.keys()))
for code, count in camera_code_counter.items():
@@ -163,78 +230,124 @@ async def generate_daily_report() -> Optional[str]:
logger.warning(f"摄像头名称解析失败,降级使用代码: {e}")
camera_counter = camera_code_counter
# 环比
if prev_total > 0:
change_pct = (yesterday_total - prev_total) / prev_total * 100
if change_pct > 0:
change_str = f"前日{prev_total}条,↑{change_pct:.1f}%"
elif change_pct < 0:
change_str = f"前日{prev_total}条,↓{abs(change_pct):.1f}%"
else:
change_str = f"前日{prev_total}条,持平"
else:
change_str = "前日无工单"
# 平均响应时长
resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据"
# 待处理数量
pending_count = sum(
1 for o in yesterday_orders if o.status in ("PENDING", "ASSIGNED")
)
backlog_count = len(current_open_orders)
carry_over_count = sum(1 for order in current_open_orders if order.create_time and order.create_time < today_start)
completed_count = status_count.get("COMPLETED", 0)
cancelled_count = status_count.get("CANCELLED", 0)
avg_resp = _format_duration(_safe_avg(response_times))
avg_close = _format_duration(_safe_avg(close_times))
false_alarm_rate = _format_ratio(false_alarm_count, type_count.get("SECURITY", 0))
# ==================== 组装 Markdown ====================
overdue_orders = sorted(
(
order for order in current_open_orders
if order.create_time and order.create_time < today_start
),
key=lambda item: item.create_time,
)
top_overdue = []
for order in overdue_orders[:3]:
age_minutes = max(int((now - order.create_time).total_seconds() / 60), 0)
top_overdue.append(
f"{order.order_code or order.id}{_location_name(order)}|挂起 {age_minutes} 分钟"
)
report = {
"date_str": date_str,
"weekday": weekday,
"empty": False,
"title": f"物业工单日报 {date_str}{weekday}",
"subtitle": "昨日运营概览",
"overview": f"昨日新增 {yesterday_total}|完成 {completed_count}|当前待处理 {backlog_count}",
"change_str": _format_change(yesterday_total, prev_total),
"summary": {
"yesterday_total": yesterday_total,
"completed_count": completed_count,
"backlog_count": backlog_count,
"security_count": type_count.get("SECURITY", 0),
"clean_count": type_count.get("CLEAN", 0),
"avg_resp": avg_resp,
"avg_close": avg_close,
"false_alarm_rate": false_alarm_rate,
"carry_over_count": carry_over_count,
"cancelled_count": status_count.get("CANCELLED", 0),
},
"risk_lines": [
f"安保高发:{_top_summary(alarm_type_count, ALARM_TYPE_NAMES)}",
f"高发区域:{_top_summary(area_counter)}",
f"高发摄像头:{_top_summary(camera_counter)}",
f"超时未处理:{carry_over_count}",
],
"tops": {
"alarm_types": _top_summary(alarm_type_count, ALARM_TYPE_NAMES),
"areas": _top_summary(area_counter),
"cameras": _top_summary(camera_counter),
"cleaning_types": _top_summary(cleaning_type_count, CLEANING_TYPE_NAMES),
},
"top_overdue": top_overdue,
}
return report
def _build_preview_text(report: Dict) -> str:
if report.get("empty"):
return (
f"**{report['title']}**\n\n"
f">昨日新增0 条\n"
f">当前待处理0 条\n"
f">系统运行平稳"
)
summary = report["summary"]
lines = [
f"**物业工单日报 — {date_str}{weekday}**",
f"**{report['title']}**",
"",
f">昨日工单总计<font color=\"warning\">{yesterday_total}</font> 条({change_str}",
f">昨日新增<font color=\"warning\">{summary['yesterday_total']}</font> 条({report['change_str']}",
f">昨日完成:{summary['completed_count']} 条 | 当前待处理:<font color=\"warning\">{summary['backlog_count']}</font> 条",
f">安保:{summary['security_count']} 条 | 保洁:{summary['clean_count']}",
f">平均首响:<font color=\"info\">{summary['avg_resp']}</font> | 平均完结:<font color=\"info\">{summary['avg_close']}</font>",
f">误报率:{summary['false_alarm_rate']} | 遗留待处理:{summary['carry_over_count']}",
"",
"**重点风险**",
]
# 按工单类型
sec_count = type_count.get("SECURITY", 0)
clean_count = type_count.get("CLEAN", 0)
if sec_count and clean_count:
lines.append(f">安保工单:{sec_count}条 | 保洁工单:{clean_count}")
elif sec_count:
lines.append(f">安保工单:{sec_count}")
elif clean_count:
lines.append(f">保洁工单:{clean_count}")
lines.append(f">待处理:<font color=\"warning\">{pending_count}</font> 条 | "
f"已完成:{completed_count}条 | 已取消:{cancelled_count}条 | 误报:{false_alarm_count}")
lines.append(f">平均响应:<font color=\"info\">{resp_str}</font>")
# 安保告警类型分布
if alarm_type_count:
lines.extend(f">{line}" for line in report["risk_lines"])
if report["top_overdue"]:
lines.append("")
lines.append("**安保告警类型分布**")
for alarm_type, count in alarm_type_count.most_common():
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
lines.append(f">{type_name}{count}")
# 保洁类型分布
if cleaning_type_count:
lines.append("")
lines.append("**保洁类型分布**")
for ct, count in cleaning_type_count.most_common():
ct_name = CLEANING_TYPE_NAMES.get(ct, ct)
lines.append(f">{ct_name}{count}")
# 摄像头 Top5
top5_cameras = camera_counter.most_common(5)
if top5_cameras:
lines.append("")
lines.append("**告警摄像头 Top5**")
for i, (cam_name, count) in enumerate(top5_cameras, 1):
lines.append(f">{i}. {cam_name}{count}")
lines.append("**需优先跟进**")
lines.extend(f">{idx}. {item}" for idx, item in enumerate(report["top_overdue"], start=1))
return "\n".join(lines)
def _build_report_news(report: Dict) -> Dict:
summary = report["summary"]
click_url = settings.wechat.service_base_url or "https://work.weixin.qq.com"
description = (
f"{report['overview']}\n"
f"首响 {summary['avg_resp']}|完结 {summary['avg_close']}|误报率 {summary['false_alarm_rate']}\n"
f"高发区域:{report['tops']['areas']}"
)
return {
"title": report["title"],
"description": description,
"url": click_url,
"picurl": REPORT_COVER_IMAGE_URL,
}
async def generate_daily_report() -> Optional[str]:
report = await _build_daily_report_data()
if not report:
return None
return _build_preview_text(report)
async def generate_daily_report_news() -> Optional[Dict]:
report = await _build_daily_report_data()
if not report:
return None
return _build_report_news(report)
async def _send_daily_report():
"""生成并发送日报"""
from app.services.wechat_service import get_wechat_service
chat_id = settings.wechat.group_chat_id
@@ -243,23 +356,38 @@ async def _send_daily_report():
return
try:
content = await generate_daily_report()
if not content:
news = await generate_daily_report_news()
preview = await generate_daily_report()
if not news or not preview:
logger.info("日报生成内容为空,跳过发送")
return
wechat_svc = get_wechat_service()
ok = await wechat_svc.send_group_markdown(chat_id, content)
ok = await wechat_svc.send_group_news(
chat_id=chat_id,
title=news["title"],
description=news["description"],
url=news["url"],
picurl=news["picurl"],
)
if ok:
logger.info("日报已发送到企微群聊")
await asyncio.sleep(1)
detail_ok = await wechat_svc.send_group_markdown(chat_id, preview)
if detail_ok:
logger.info("日报图文+摘要已发送到企微群聊")
else:
logger.warning("日报图文已发送但摘要markdown发送失败")
else:
logger.error("日报发送失败")
fallback_ok = await wechat_svc.send_group_markdown(chat_id, preview)
if fallback_ok:
logger.info("日报图文发送失败已降级为markdown发送")
else:
logger.error("日报发送失败")
except Exception:
logger.exception("日报生成或发送异常")
def _seconds_until(hour: int, minute: int) -> float:
"""计算距离下一个 HH:MM 的秒数"""
from app.utils.timezone import beijing_now
now = beijing_now()
@@ -270,7 +398,6 @@ def _seconds_until(hour: int, minute: int) -> float:
async def start_daily_report_scheduler():
"""日报定时调度主循环"""
hour = settings.daily_report.send_hour
minute = settings.daily_report.send_minute
logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送")
@@ -281,7 +408,6 @@ async def start_daily_report_scheduler():
logger.debug(f"日报下次发送倒计时 {wait:.0f}")
await asyncio.sleep(wait)
await _send_daily_report()
# 发送完等 61 秒,避免同一分钟内重复触发
await asyncio.sleep(61)
except asyncio.CancelledError:
logger.info("日报定时任务已停止")