From 30db9d89615f634bce0afe36a00c349ae026ff2b Mon Sep 17 00:00:00 2001
From: 16337 <1633794139@qq.com>
Date: Fri, 3 Apr 2026 13:16:04 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9A=E6=97=A5=E6=8A=A5?=
=?UTF-8?q?=E5=8D=87=E7=BA=A7=E4=B8=BA=E5=9B=BE=E6=96=87=E6=91=98=E8=A6=81?=
=?UTF-8?q?=E5=8A=A0=E8=AF=A6=E6=83=85=E6=8E=A8=E9=80=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
app/services/daily_report_service.py | 380 ++++++++++++++++++---------
1 file changed, 253 insertions(+), 127 deletions(-)
diff --git a/app/services/daily_report_service.py b/app/services/daily_report_service.py
index 6f96a8b..d9938b9 100644
--- a/app/services/daily_report_service.py
+++ b/app/services/daily_report_service.py
@@ -4,15 +4,15 @@
每天定时生成前一天的工单汇总,发送到企微群聊。
数据源:IoT ops_order + 安保/保洁扩展表。
"""
+
import asyncio
-from collections import Counter, defaultdict
+from collections import Counter
from datetime import timedelta
from typing import Dict, List, Optional
-from app.utils.logger import logger
from app.config import settings
+from app.utils.logger import logger
-# 告警类型中文映射
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
@@ -20,30 +20,65 @@ ALARM_TYPE_NAMES = {
"vehicle_congestion": "车辆拥堵",
}
-# 保洁类型映射
CLEANING_TYPE_NAMES = {
- "ROUTINE": "日常保洁", "DEEP": "深度保洁",
- "SPOT": "点状保洁", "EMERGENCY": "应急保洁",
-}
-
-# 工单状态映射
-ORDER_STATUS_NAMES = {
- "PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗",
- "PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消",
+ "ROUTINE": "日常保洁",
+ "DEEP": "深度保洁",
+ "SPOT": "点状保洁",
+ "EMERGENCY": "应急保洁",
}
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
+REPORT_COVER_IMAGE_URL = "https://wework.qpic.cn/wwpic/252813_jOfDHtcISzuodLa_1629280209/0"
-def _format_resp_time(minutes: float) -> str:
- """格式化响应时长"""
+def _format_duration(minutes: float) -> str:
+ if minutes <= 0:
+ return "暂无数据"
if minutes < 60:
return f"{minutes:.1f}分钟"
return f"{minutes / 60:.1f}小时"
-async def generate_daily_report() -> Optional[str]:
- """生成昨日工单日报 Markdown 内容"""
+def _format_ratio(numerator: int, denominator: int) -> str:
+ if denominator <= 0:
+ return "0%"
+ return f"{(numerator / denominator) * 100:.1f}%"
+
+
+def _format_change(current: int, previous: int) -> str:
+ if previous <= 0:
+ return "前日无工单"
+ change_pct = (current - previous) / previous * 100
+ if change_pct > 0:
+ return f"前日{previous}条,↑{change_pct:.1f}%"
+ if change_pct < 0:
+ return f"前日{previous}条,↓{abs(change_pct):.1f}%"
+ return f"前日{previous}条,持平"
+
+
+def _top_summary(counter: Counter, mapping: Optional[Dict[str, str]] = None, top_n: int = 3) -> str:
+ if not counter:
+ return "暂无数据"
+ parts = []
+ for key, count in counter.most_common(top_n):
+ name = mapping.get(key, key) if mapping else key
+ parts.append(f"{name} {count}")
+ return ",".join(parts)
+
+
+def _location_name(order) -> str:
+ if getattr(order, "location", None):
+ return order.location
+ if getattr(order, "area_id", None):
+ return f"区域{order.area_id}"
+ return "未标注区域"
+
+
+def _safe_avg(values: List[float]) -> float:
+ return sum(values) / len(values) if values else 0.0
+
+
+async def _build_daily_report_data() -> Optional[Dict]:
from app.utils.timezone import beijing_now
now = beijing_now()
@@ -54,11 +89,12 @@ async def generate_daily_report() -> Optional[str]:
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
- # 查询 IoT 工单
try:
from app.models_iot import (
- get_iot_session, IotOpsOrder,
- IotOpsOrderSecurityExt, IotOpsOrderCleanExt,
+ get_iot_session,
+ IotOpsOrder,
+ IotOpsOrderSecurityExt,
+ IotOpsOrderCleanExt,
)
except Exception as e:
logger.error(f"IoT数据库不可用,日报生成失败: {e}")
@@ -66,7 +102,6 @@ async def generate_daily_report() -> Optional[str]:
db = get_iot_session()
try:
- # 昨日工单
yesterday_orders = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= yesterday_start,
IotOpsOrder.create_time < today_start,
@@ -74,38 +109,58 @@ async def generate_daily_report() -> Optional[str]:
).all()
yesterday_total = len(yesterday_orders)
- # 前日工单(用于环比)
prev_total = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= day_before_start,
IotOpsOrder.create_time < yesterday_start,
IotOpsOrder.deleted == 0,
).count()
- if yesterday_total == 0:
- return (
- f"**物业工单日报 — {date_str}({weekday})**\n\n"
- f">昨日工单总计:**0** 条\n"
- f">系统运行正常,无工单"
- )
+ current_open_orders = db.query(IotOpsOrder).filter(
+ IotOpsOrder.deleted == 0,
+ IotOpsOrder.status.notin_(("COMPLETED", "CANCELLED")),
+ ).all()
+
+ if not yesterday_orders and not current_open_orders:
+ return {
+ "date_str": date_str,
+ "weekday": weekday,
+ "empty": True,
+ "title": f"物业工单日报 {date_str}({weekday})",
+ "subtitle": "昨日系统运行平稳",
+ "overview": "昨日无新增工单,当前无待处理工单。",
+ "summary": {
+ "yesterday_total": 0,
+ "completed_count": 0,
+ "backlog_count": 0,
+ "avg_resp": "暂无数据",
+ "avg_close": "暂无数据",
+ "false_alarm_rate": "0%",
+ },
+ "risk_lines": [
+ "安保高发:暂无数据",
+ "高发区域:暂无数据",
+ "高发摄像头:暂无数据",
+ "超时未处理:0 条",
+ ],
+ "change_str": "前日无工单",
+ "top_overdue": [],
+ }
- # 收集 order_ids
order_ids = [o.id for o in yesterday_orders]
-
- # 批量查安保扩展
sec_ext_map = {}
- sec_exts = db.query(IotOpsOrderSecurityExt).filter(
- IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
- IotOpsOrderSecurityExt.deleted == 0,
- ).all()
- sec_ext_map = {e.ops_order_id: e for e in sec_exts}
-
- # 批量查保洁扩展
clean_ext_map = {}
- clean_exts = db.query(IotOpsOrderCleanExt).filter(
- IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
- IotOpsOrderCleanExt.deleted == 0,
- ).all()
- clean_ext_map = {e.ops_order_id: e for e in clean_exts}
+ if order_ids:
+ sec_exts = db.query(IotOpsOrderSecurityExt).filter(
+ IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
+ IotOpsOrderSecurityExt.deleted == 0,
+ ).all()
+ sec_ext_map = {e.ops_order_id: e for e in sec_exts}
+
+ clean_exts = db.query(IotOpsOrderCleanExt).filter(
+ IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
+ IotOpsOrderCleanExt.deleted == 0,
+ ).all()
+ clean_ext_map = {e.ops_order_id: e for e in clean_exts}
except Exception as e:
logger.error(f"查询IoT工单失败: {e}", exc_info=True)
@@ -113,47 +168,59 @@ async def generate_daily_report() -> Optional[str]:
finally:
db.close()
- # ---- 统计 ----
type_count = {"SECURITY": 0, "CLEAN": 0}
status_count = Counter()
alarm_type_count = Counter()
- camera_code_counter = Counter() # 先按 camera_code 统计
+ area_counter = Counter()
+ camera_code_counter = Counter()
false_alarm_count = 0
response_times: List[float] = []
+ close_times: List[float] = []
cleaning_type_count = Counter()
- for o in yesterday_orders:
- ot = o.order_type or "SECURITY"
- type_count[ot] = type_count.get(ot, 0) + 1
- status_count[o.status or "PENDING"] += 1
+ for order in yesterday_orders:
+ order_type = order.order_type or "SECURITY"
+ type_count[order_type] = type_count.get(order_type, 0) + 1
+ status_count[order.status or "PENDING"] += 1
+ area_counter[_location_name(order)] += 1
- # 安保统计
- sec_ext = sec_ext_map.get(o.id)
+ sec_ext = sec_ext_map.get(order.id)
if sec_ext:
if sec_ext.alarm_type:
alarm_type_count[sec_ext.alarm_type] += 1
- # 统一用 camera_id(即 camera_code)做 key,后续批量解析名称
cam_key = sec_ext.camera_id or sec_ext.camera_name
if cam_key:
camera_code_counter[cam_key] += 1
if sec_ext.false_alarm == 1:
false_alarm_count += 1
- # 响应时长:dispatched → confirmed
if sec_ext.dispatched_time and sec_ext.confirmed_time:
delta = (sec_ext.confirmed_time - sec_ext.dispatched_time).total_seconds() / 60.0
if 0 <= delta <= 360:
response_times.append(delta)
+ if sec_ext.dispatched_time and sec_ext.completed_time:
+ delta = (sec_ext.completed_time - sec_ext.dispatched_time).total_seconds() / 60.0
+ if 0 <= delta <= 24 * 60:
+ close_times.append(delta)
- # 保洁统计
- clean_ext = clean_ext_map.get(o.id)
- if clean_ext and clean_ext.cleaning_type:
- cleaning_type_count[clean_ext.cleaning_type] += 1
+ clean_ext = clean_ext_map.get(order.id)
+ if clean_ext:
+ if clean_ext.cleaning_type:
+ cleaning_type_count[clean_ext.cleaning_type] += 1
+ dispatch_time = clean_ext.first_dispatched_time or clean_ext.dispatched_time
+ if dispatch_time and clean_ext.arrived_time:
+ delta = (clean_ext.arrived_time - dispatch_time).total_seconds() / 60.0
+ if 0 <= delta <= 360:
+ response_times.append(delta)
+ if dispatch_time and clean_ext.completed_time:
+ delta = (clean_ext.completed_time - dispatch_time).total_seconds() / 60.0
+ if 0 <= delta <= 24 * 60:
+ close_times.append(delta)
- # 批量解析摄像头名称(camera_code → 真实名称)
camera_counter = Counter()
if camera_code_counter:
try:
from app.services.camera_name_service import get_camera_name_service
+
cam_svc = get_camera_name_service()
name_map = await cam_svc.get_display_names_batch(list(camera_code_counter.keys()))
for code, count in camera_code_counter.items():
@@ -163,78 +230,124 @@ async def generate_daily_report() -> Optional[str]:
logger.warning(f"摄像头名称解析失败,降级使用代码: {e}")
camera_counter = camera_code_counter
- # 环比
- if prev_total > 0:
- change_pct = (yesterday_total - prev_total) / prev_total * 100
- if change_pct > 0:
- change_str = f"前日{prev_total}条,↑{change_pct:.1f}%"
- elif change_pct < 0:
- change_str = f"前日{prev_total}条,↓{abs(change_pct):.1f}%"
- else:
- change_str = f"前日{prev_total}条,持平"
- else:
- change_str = "前日无工单"
-
- # 平均响应时长
- resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据"
-
- # 待处理数量
- pending_count = sum(
- 1 for o in yesterday_orders if o.status in ("PENDING", "ASSIGNED")
- )
+ backlog_count = len(current_open_orders)
+ carry_over_count = sum(1 for order in current_open_orders if order.create_time and order.create_time < today_start)
completed_count = status_count.get("COMPLETED", 0)
- cancelled_count = status_count.get("CANCELLED", 0)
+ avg_resp = _format_duration(_safe_avg(response_times))
+ avg_close = _format_duration(_safe_avg(close_times))
+ false_alarm_rate = _format_ratio(false_alarm_count, type_count.get("SECURITY", 0))
- # ==================== 组装 Markdown ====================
+ overdue_orders = sorted(
+ (
+ order for order in current_open_orders
+ if order.create_time and order.create_time < today_start
+ ),
+ key=lambda item: item.create_time,
+ )
+ top_overdue = []
+ for order in overdue_orders[:3]:
+ age_minutes = max(int((now - order.create_time).total_seconds() / 60), 0)
+ top_overdue.append(
+ f"{order.order_code or order.id}|{_location_name(order)}|挂起 {age_minutes} 分钟"
+ )
+
+ report = {
+ "date_str": date_str,
+ "weekday": weekday,
+ "empty": False,
+ "title": f"物业工单日报 {date_str}({weekday})",
+ "subtitle": "昨日运营概览",
+ "overview": f"昨日新增 {yesterday_total}|完成 {completed_count}|当前待处理 {backlog_count}",
+ "change_str": _format_change(yesterday_total, prev_total),
+ "summary": {
+ "yesterday_total": yesterday_total,
+ "completed_count": completed_count,
+ "backlog_count": backlog_count,
+ "security_count": type_count.get("SECURITY", 0),
+ "clean_count": type_count.get("CLEAN", 0),
+ "avg_resp": avg_resp,
+ "avg_close": avg_close,
+ "false_alarm_rate": false_alarm_rate,
+ "carry_over_count": carry_over_count,
+ "cancelled_count": status_count.get("CANCELLED", 0),
+ },
+ "risk_lines": [
+ f"安保高发:{_top_summary(alarm_type_count, ALARM_TYPE_NAMES)}",
+ f"高发区域:{_top_summary(area_counter)}",
+ f"高发摄像头:{_top_summary(camera_counter)}",
+ f"超时未处理:{carry_over_count} 条",
+ ],
+ "tops": {
+ "alarm_types": _top_summary(alarm_type_count, ALARM_TYPE_NAMES),
+ "areas": _top_summary(area_counter),
+ "cameras": _top_summary(camera_counter),
+ "cleaning_types": _top_summary(cleaning_type_count, CLEANING_TYPE_NAMES),
+ },
+ "top_overdue": top_overdue,
+ }
+ return report
+
+
+def _build_preview_text(report: Dict) -> str:
+ if report.get("empty"):
+ return (
+ f"**{report['title']}**\n\n"
+ f">昨日新增:0 条\n"
+ f">当前待处理:0 条\n"
+ f">系统运行平稳"
+ )
+
+ summary = report["summary"]
lines = [
- f"**物业工单日报 — {date_str}({weekday})**",
+ f"**{report['title']}**",
"",
- f">昨日工单总计:{yesterday_total} 条({change_str})",
+ f">昨日新增:{summary['yesterday_total']} 条({report['change_str']})",
+ f">昨日完成:{summary['completed_count']} 条 | 当前待处理:{summary['backlog_count']} 条",
+ f">安保:{summary['security_count']} 条 | 保洁:{summary['clean_count']} 条",
+ f">平均首响:{summary['avg_resp']} | 平均完结:{summary['avg_close']}",
+ f">误报率:{summary['false_alarm_rate']} | 遗留待处理:{summary['carry_over_count']} 条",
+ "",
+ "**重点风险**",
]
-
- # 按工单类型
- sec_count = type_count.get("SECURITY", 0)
- clean_count = type_count.get("CLEAN", 0)
- if sec_count and clean_count:
- lines.append(f">安保工单:{sec_count}条 | 保洁工单:{clean_count}条")
- elif sec_count:
- lines.append(f">安保工单:{sec_count}条")
- elif clean_count:
- lines.append(f">保洁工单:{clean_count}条")
-
- lines.append(f">待处理:{pending_count} 条 | "
- f"已完成:{completed_count}条 | 已取消:{cancelled_count}条 | 误报:{false_alarm_count}条")
- lines.append(f">平均响应:{resp_str}")
-
- # 安保告警类型分布
- if alarm_type_count:
+ lines.extend(f">{line}" for line in report["risk_lines"])
+ if report["top_overdue"]:
lines.append("")
- lines.append("**安保告警类型分布**")
- for alarm_type, count in alarm_type_count.most_common():
- type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
- lines.append(f">{type_name}:{count}条")
-
- # 保洁类型分布
- if cleaning_type_count:
- lines.append("")
- lines.append("**保洁类型分布**")
- for ct, count in cleaning_type_count.most_common():
- ct_name = CLEANING_TYPE_NAMES.get(ct, ct)
- lines.append(f">{ct_name}:{count}条")
-
- # 摄像头 Top5
- top5_cameras = camera_counter.most_common(5)
- if top5_cameras:
- lines.append("")
- lines.append("**告警摄像头 Top5**")
- for i, (cam_name, count) in enumerate(top5_cameras, 1):
- lines.append(f">{i}. {cam_name} — {count}条")
-
+ lines.append("**需优先跟进**")
+ lines.extend(f">{idx}. {item}" for idx, item in enumerate(report["top_overdue"], start=1))
return "\n".join(lines)
+def _build_report_news(report: Dict) -> Dict:
+ summary = report["summary"]
+ click_url = settings.wechat.service_base_url or "https://work.weixin.qq.com"
+ description = (
+ f"{report['overview']}\n"
+ f"首响 {summary['avg_resp']}|完结 {summary['avg_close']}|误报率 {summary['false_alarm_rate']}\n"
+ f"高发区域:{report['tops']['areas']}"
+ )
+ return {
+ "title": report["title"],
+ "description": description,
+ "url": click_url,
+ "picurl": REPORT_COVER_IMAGE_URL,
+ }
+
+
+async def generate_daily_report() -> Optional[str]:
+ report = await _build_daily_report_data()
+ if not report:
+ return None
+ return _build_preview_text(report)
+
+
+async def generate_daily_report_news() -> Optional[Dict]:
+ report = await _build_daily_report_data()
+ if not report:
+ return None
+ return _build_report_news(report)
+
+
async def _send_daily_report():
- """生成并发送日报"""
from app.services.wechat_service import get_wechat_service
chat_id = settings.wechat.group_chat_id
@@ -243,23 +356,38 @@ async def _send_daily_report():
return
try:
- content = await generate_daily_report()
- if not content:
+ news = await generate_daily_report_news()
+ preview = await generate_daily_report()
+ if not news or not preview:
logger.info("日报生成内容为空,跳过发送")
return
wechat_svc = get_wechat_service()
- ok = await wechat_svc.send_group_markdown(chat_id, content)
+ ok = await wechat_svc.send_group_news(
+ chat_id=chat_id,
+ title=news["title"],
+ description=news["description"],
+ url=news["url"],
+ picurl=news["picurl"],
+ )
if ok:
- logger.info("日报已发送到企微群聊")
+ await asyncio.sleep(1)
+ detail_ok = await wechat_svc.send_group_markdown(chat_id, preview)
+ if detail_ok:
+ logger.info("日报图文+摘要已发送到企微群聊")
+ else:
+ logger.warning("日报图文已发送,但摘要markdown发送失败")
else:
- logger.error("日报发送失败")
+ fallback_ok = await wechat_svc.send_group_markdown(chat_id, preview)
+ if fallback_ok:
+ logger.info("日报图文发送失败,已降级为markdown发送")
+ else:
+ logger.error("日报发送失败")
except Exception:
logger.exception("日报生成或发送异常")
def _seconds_until(hour: int, minute: int) -> float:
- """计算距离下一个 HH:MM 的秒数"""
from app.utils.timezone import beijing_now
now = beijing_now()
@@ -270,7 +398,6 @@ def _seconds_until(hour: int, minute: int) -> float:
async def start_daily_report_scheduler():
- """日报定时调度主循环"""
hour = settings.daily_report.send_hour
minute = settings.daily_report.send_minute
logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送")
@@ -281,7 +408,6 @@ async def start_daily_report_scheduler():
logger.debug(f"日报下次发送倒计时 {wait:.0f} 秒")
await asyncio.sleep(wait)
await _send_daily_report()
- # 发送完等 61 秒,避免同一分钟内重复触发
await asyncio.sleep(61)
except asyncio.CancelledError:
logger.info("日报定时任务已停止")