diff --git a/app/services/daily_report_service.py b/app/services/daily_report_service.py index 6f96a8b..d9938b9 100644 --- a/app/services/daily_report_service.py +++ b/app/services/daily_report_service.py @@ -4,15 +4,15 @@ 每天定时生成前一天的工单汇总,发送到企微群聊。 数据源:IoT ops_order + 安保/保洁扩展表。 """ + import asyncio -from collections import Counter, defaultdict +from collections import Counter from datetime import timedelta from typing import Dict, List, Optional -from app.utils.logger import logger from app.config import settings +from app.utils.logger import logger -# 告警类型中文映射 ALARM_TYPE_NAMES = { "leave_post": "人员离岗", "intrusion": "周界入侵", @@ -20,30 +20,65 @@ ALARM_TYPE_NAMES = { "vehicle_congestion": "车辆拥堵", } -# 保洁类型映射 CLEANING_TYPE_NAMES = { - "ROUTINE": "日常保洁", "DEEP": "深度保洁", - "SPOT": "点状保洁", "EMERGENCY": "应急保洁", -} - -# 工单状态映射 -ORDER_STATUS_NAMES = { - "PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗", - "PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消", + "ROUTINE": "日常保洁", + "DEEP": "深度保洁", + "SPOT": "点状保洁", + "EMERGENCY": "应急保洁", } WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] +REPORT_COVER_IMAGE_URL = "https://wework.qpic.cn/wwpic/252813_jOfDHtcISzuodLa_1629280209/0" -def _format_resp_time(minutes: float) -> str: - """格式化响应时长""" +def _format_duration(minutes: float) -> str: + if minutes <= 0: + return "暂无数据" if minutes < 60: return f"{minutes:.1f}分钟" return f"{minutes / 60:.1f}小时" -async def generate_daily_report() -> Optional[str]: - """生成昨日工单日报 Markdown 内容""" +def _format_ratio(numerator: int, denominator: int) -> str: + if denominator <= 0: + return "0%" + return f"{(numerator / denominator) * 100:.1f}%" + + +def _format_change(current: int, previous: int) -> str: + if previous <= 0: + return "前日无工单" + change_pct = (current - previous) / previous * 100 + if change_pct > 0: + return f"前日{previous}条,↑{change_pct:.1f}%" + if change_pct < 0: + return f"前日{previous}条,↓{abs(change_pct):.1f}%" + return f"前日{previous}条,持平" + + +def _top_summary(counter: Counter, mapping: Optional[Dict[str, str]] = None, top_n: int = 3) -> str: + if not counter: + return "暂无数据" + parts = [] + for key, count in counter.most_common(top_n): + name = mapping.get(key, key) if mapping else key + parts.append(f"{name} {count}") + return ",".join(parts) + + +def _location_name(order) -> str: + if getattr(order, "location", None): + return order.location + if getattr(order, "area_id", None): + return f"区域{order.area_id}" + return "未标注区域" + + +def _safe_avg(values: List[float]) -> float: + return sum(values) / len(values) if values else 0.0 + + +async def _build_daily_report_data() -> Optional[Dict]: from app.utils.timezone import beijing_now now = beijing_now() @@ -54,11 +89,12 @@ async def generate_daily_report() -> Optional[str]: date_str = yesterday_start.strftime("%m-%d") weekday = WEEKDAY_NAMES[yesterday_start.weekday()] - # 查询 IoT 工单 try: from app.models_iot import ( - get_iot_session, IotOpsOrder, - IotOpsOrderSecurityExt, IotOpsOrderCleanExt, + get_iot_session, + IotOpsOrder, + IotOpsOrderSecurityExt, + IotOpsOrderCleanExt, ) except Exception as e: logger.error(f"IoT数据库不可用,日报生成失败: {e}") @@ -66,7 +102,6 @@ async def generate_daily_report() -> Optional[str]: db = get_iot_session() try: - # 昨日工单 yesterday_orders = db.query(IotOpsOrder).filter( IotOpsOrder.create_time >= yesterday_start, IotOpsOrder.create_time < today_start, @@ -74,38 +109,58 @@ async def generate_daily_report() -> Optional[str]: ).all() yesterday_total = len(yesterday_orders) - # 前日工单(用于环比) prev_total = db.query(IotOpsOrder).filter( IotOpsOrder.create_time >= day_before_start, IotOpsOrder.create_time < yesterday_start, IotOpsOrder.deleted == 0, ).count() - if yesterday_total == 0: - return ( - f"**物业工单日报 — {date_str}({weekday})**\n\n" - f">昨日工单总计:**0** 条\n" - f">系统运行正常,无工单" - ) + current_open_orders = db.query(IotOpsOrder).filter( + IotOpsOrder.deleted == 0, + IotOpsOrder.status.notin_(("COMPLETED", "CANCELLED")), + ).all() + + if not yesterday_orders and not current_open_orders: + return { + "date_str": date_str, + "weekday": weekday, + "empty": True, + "title": f"物业工单日报 {date_str}({weekday})", + "subtitle": "昨日系统运行平稳", + "overview": "昨日无新增工单,当前无待处理工单。", + "summary": { + "yesterday_total": 0, + "completed_count": 0, + "backlog_count": 0, + "avg_resp": "暂无数据", + "avg_close": "暂无数据", + "false_alarm_rate": "0%", + }, + "risk_lines": [ + "安保高发:暂无数据", + "高发区域:暂无数据", + "高发摄像头:暂无数据", + "超时未处理:0 条", + ], + "change_str": "前日无工单", + "top_overdue": [], + } - # 收集 order_ids order_ids = [o.id for o in yesterday_orders] - - # 批量查安保扩展 sec_ext_map = {} - sec_exts = db.query(IotOpsOrderSecurityExt).filter( - IotOpsOrderSecurityExt.ops_order_id.in_(order_ids), - IotOpsOrderSecurityExt.deleted == 0, - ).all() - sec_ext_map = {e.ops_order_id: e for e in sec_exts} - - # 批量查保洁扩展 clean_ext_map = {} - clean_exts = db.query(IotOpsOrderCleanExt).filter( - IotOpsOrderCleanExt.ops_order_id.in_(order_ids), - IotOpsOrderCleanExt.deleted == 0, - ).all() - clean_ext_map = {e.ops_order_id: e for e in clean_exts} + if order_ids: + sec_exts = db.query(IotOpsOrderSecurityExt).filter( + IotOpsOrderSecurityExt.ops_order_id.in_(order_ids), + IotOpsOrderSecurityExt.deleted == 0, + ).all() + sec_ext_map = {e.ops_order_id: e for e in sec_exts} + + clean_exts = db.query(IotOpsOrderCleanExt).filter( + IotOpsOrderCleanExt.ops_order_id.in_(order_ids), + IotOpsOrderCleanExt.deleted == 0, + ).all() + clean_ext_map = {e.ops_order_id: e for e in clean_exts} except Exception as e: logger.error(f"查询IoT工单失败: {e}", exc_info=True) @@ -113,47 +168,59 @@ async def generate_daily_report() -> Optional[str]: finally: db.close() - # ---- 统计 ---- type_count = {"SECURITY": 0, "CLEAN": 0} status_count = Counter() alarm_type_count = Counter() - camera_code_counter = Counter() # 先按 camera_code 统计 + area_counter = Counter() + camera_code_counter = Counter() false_alarm_count = 0 response_times: List[float] = [] + close_times: List[float] = [] cleaning_type_count = Counter() - for o in yesterday_orders: - ot = o.order_type or "SECURITY" - type_count[ot] = type_count.get(ot, 0) + 1 - status_count[o.status or "PENDING"] += 1 + for order in yesterday_orders: + order_type = order.order_type or "SECURITY" + type_count[order_type] = type_count.get(order_type, 0) + 1 + status_count[order.status or "PENDING"] += 1 + area_counter[_location_name(order)] += 1 - # 安保统计 - sec_ext = sec_ext_map.get(o.id) + sec_ext = sec_ext_map.get(order.id) if sec_ext: if sec_ext.alarm_type: alarm_type_count[sec_ext.alarm_type] += 1 - # 统一用 camera_id(即 camera_code)做 key,后续批量解析名称 cam_key = sec_ext.camera_id or sec_ext.camera_name if cam_key: camera_code_counter[cam_key] += 1 if sec_ext.false_alarm == 1: false_alarm_count += 1 - # 响应时长:dispatched → confirmed if sec_ext.dispatched_time and sec_ext.confirmed_time: delta = (sec_ext.confirmed_time - sec_ext.dispatched_time).total_seconds() / 60.0 if 0 <= delta <= 360: response_times.append(delta) + if sec_ext.dispatched_time and sec_ext.completed_time: + delta = (sec_ext.completed_time - sec_ext.dispatched_time).total_seconds() / 60.0 + if 0 <= delta <= 24 * 60: + close_times.append(delta) - # 保洁统计 - clean_ext = clean_ext_map.get(o.id) - if clean_ext and clean_ext.cleaning_type: - cleaning_type_count[clean_ext.cleaning_type] += 1 + clean_ext = clean_ext_map.get(order.id) + if clean_ext: + if clean_ext.cleaning_type: + cleaning_type_count[clean_ext.cleaning_type] += 1 + dispatch_time = clean_ext.first_dispatched_time or clean_ext.dispatched_time + if dispatch_time and clean_ext.arrived_time: + delta = (clean_ext.arrived_time - dispatch_time).total_seconds() / 60.0 + if 0 <= delta <= 360: + response_times.append(delta) + if dispatch_time and clean_ext.completed_time: + delta = (clean_ext.completed_time - dispatch_time).total_seconds() / 60.0 + if 0 <= delta <= 24 * 60: + close_times.append(delta) - # 批量解析摄像头名称(camera_code → 真实名称) camera_counter = Counter() if camera_code_counter: try: from app.services.camera_name_service import get_camera_name_service + cam_svc = get_camera_name_service() name_map = await cam_svc.get_display_names_batch(list(camera_code_counter.keys())) for code, count in camera_code_counter.items(): @@ -163,78 +230,124 @@ async def generate_daily_report() -> Optional[str]: logger.warning(f"摄像头名称解析失败,降级使用代码: {e}") camera_counter = camera_code_counter - # 环比 - if prev_total > 0: - change_pct = (yesterday_total - prev_total) / prev_total * 100 - if change_pct > 0: - change_str = f"前日{prev_total}条,↑{change_pct:.1f}%" - elif change_pct < 0: - change_str = f"前日{prev_total}条,↓{abs(change_pct):.1f}%" - else: - change_str = f"前日{prev_total}条,持平" - else: - change_str = "前日无工单" - - # 平均响应时长 - resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据" - - # 待处理数量 - pending_count = sum( - 1 for o in yesterday_orders if o.status in ("PENDING", "ASSIGNED") - ) + backlog_count = len(current_open_orders) + carry_over_count = sum(1 for order in current_open_orders if order.create_time and order.create_time < today_start) completed_count = status_count.get("COMPLETED", 0) - cancelled_count = status_count.get("CANCELLED", 0) + avg_resp = _format_duration(_safe_avg(response_times)) + avg_close = _format_duration(_safe_avg(close_times)) + false_alarm_rate = _format_ratio(false_alarm_count, type_count.get("SECURITY", 0)) - # ==================== 组装 Markdown ==================== + overdue_orders = sorted( + ( + order for order in current_open_orders + if order.create_time and order.create_time < today_start + ), + key=lambda item: item.create_time, + ) + top_overdue = [] + for order in overdue_orders[:3]: + age_minutes = max(int((now - order.create_time).total_seconds() / 60), 0) + top_overdue.append( + f"{order.order_code or order.id}|{_location_name(order)}|挂起 {age_minutes} 分钟" + ) + + report = { + "date_str": date_str, + "weekday": weekday, + "empty": False, + "title": f"物业工单日报 {date_str}({weekday})", + "subtitle": "昨日运营概览", + "overview": f"昨日新增 {yesterday_total}|完成 {completed_count}|当前待处理 {backlog_count}", + "change_str": _format_change(yesterday_total, prev_total), + "summary": { + "yesterday_total": yesterday_total, + "completed_count": completed_count, + "backlog_count": backlog_count, + "security_count": type_count.get("SECURITY", 0), + "clean_count": type_count.get("CLEAN", 0), + "avg_resp": avg_resp, + "avg_close": avg_close, + "false_alarm_rate": false_alarm_rate, + "carry_over_count": carry_over_count, + "cancelled_count": status_count.get("CANCELLED", 0), + }, + "risk_lines": [ + f"安保高发:{_top_summary(alarm_type_count, ALARM_TYPE_NAMES)}", + f"高发区域:{_top_summary(area_counter)}", + f"高发摄像头:{_top_summary(camera_counter)}", + f"超时未处理:{carry_over_count} 条", + ], + "tops": { + "alarm_types": _top_summary(alarm_type_count, ALARM_TYPE_NAMES), + "areas": _top_summary(area_counter), + "cameras": _top_summary(camera_counter), + "cleaning_types": _top_summary(cleaning_type_count, CLEANING_TYPE_NAMES), + }, + "top_overdue": top_overdue, + } + return report + + +def _build_preview_text(report: Dict) -> str: + if report.get("empty"): + return ( + f"**{report['title']}**\n\n" + f">昨日新增:0 条\n" + f">当前待处理:0 条\n" + f">系统运行平稳" + ) + + summary = report["summary"] lines = [ - f"**物业工单日报 — {date_str}({weekday})**", + f"**{report['title']}**", "", - f">昨日工单总计:{yesterday_total} 条({change_str})", + f">昨日新增:{summary['yesterday_total']} 条({report['change_str']})", + f">昨日完成:{summary['completed_count']} 条 | 当前待处理:{summary['backlog_count']} 条", + f">安保:{summary['security_count']} 条 | 保洁:{summary['clean_count']} 条", + f">平均首响:{summary['avg_resp']} | 平均完结:{summary['avg_close']}", + f">误报率:{summary['false_alarm_rate']} | 遗留待处理:{summary['carry_over_count']} 条", + "", + "**重点风险**", ] - - # 按工单类型 - sec_count = type_count.get("SECURITY", 0) - clean_count = type_count.get("CLEAN", 0) - if sec_count and clean_count: - lines.append(f">安保工单:{sec_count}条 | 保洁工单:{clean_count}条") - elif sec_count: - lines.append(f">安保工单:{sec_count}条") - elif clean_count: - lines.append(f">保洁工单:{clean_count}条") - - lines.append(f">待处理:{pending_count} 条 | " - f"已完成:{completed_count}条 | 已取消:{cancelled_count}条 | 误报:{false_alarm_count}条") - lines.append(f">平均响应:{resp_str}") - - # 安保告警类型分布 - if alarm_type_count: + lines.extend(f">{line}" for line in report["risk_lines"]) + if report["top_overdue"]: lines.append("") - lines.append("**安保告警类型分布**") - for alarm_type, count in alarm_type_count.most_common(): - type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) - lines.append(f">{type_name}:{count}条") - - # 保洁类型分布 - if cleaning_type_count: - lines.append("") - lines.append("**保洁类型分布**") - for ct, count in cleaning_type_count.most_common(): - ct_name = CLEANING_TYPE_NAMES.get(ct, ct) - lines.append(f">{ct_name}:{count}条") - - # 摄像头 Top5 - top5_cameras = camera_counter.most_common(5) - if top5_cameras: - lines.append("") - lines.append("**告警摄像头 Top5**") - for i, (cam_name, count) in enumerate(top5_cameras, 1): - lines.append(f">{i}. {cam_name} — {count}条") - + lines.append("**需优先跟进**") + lines.extend(f">{idx}. {item}" for idx, item in enumerate(report["top_overdue"], start=1)) return "\n".join(lines) +def _build_report_news(report: Dict) -> Dict: + summary = report["summary"] + click_url = settings.wechat.service_base_url or "https://work.weixin.qq.com" + description = ( + f"{report['overview']}\n" + f"首响 {summary['avg_resp']}|完结 {summary['avg_close']}|误报率 {summary['false_alarm_rate']}\n" + f"高发区域:{report['tops']['areas']}" + ) + return { + "title": report["title"], + "description": description, + "url": click_url, + "picurl": REPORT_COVER_IMAGE_URL, + } + + +async def generate_daily_report() -> Optional[str]: + report = await _build_daily_report_data() + if not report: + return None + return _build_preview_text(report) + + +async def generate_daily_report_news() -> Optional[Dict]: + report = await _build_daily_report_data() + if not report: + return None + return _build_report_news(report) + + async def _send_daily_report(): - """生成并发送日报""" from app.services.wechat_service import get_wechat_service chat_id = settings.wechat.group_chat_id @@ -243,23 +356,38 @@ async def _send_daily_report(): return try: - content = await generate_daily_report() - if not content: + news = await generate_daily_report_news() + preview = await generate_daily_report() + if not news or not preview: logger.info("日报生成内容为空,跳过发送") return wechat_svc = get_wechat_service() - ok = await wechat_svc.send_group_markdown(chat_id, content) + ok = await wechat_svc.send_group_news( + chat_id=chat_id, + title=news["title"], + description=news["description"], + url=news["url"], + picurl=news["picurl"], + ) if ok: - logger.info("日报已发送到企微群聊") + await asyncio.sleep(1) + detail_ok = await wechat_svc.send_group_markdown(chat_id, preview) + if detail_ok: + logger.info("日报图文+摘要已发送到企微群聊") + else: + logger.warning("日报图文已发送,但摘要markdown发送失败") else: - logger.error("日报发送失败") + fallback_ok = await wechat_svc.send_group_markdown(chat_id, preview) + if fallback_ok: + logger.info("日报图文发送失败,已降级为markdown发送") + else: + logger.error("日报发送失败") except Exception: logger.exception("日报生成或发送异常") def _seconds_until(hour: int, minute: int) -> float: - """计算距离下一个 HH:MM 的秒数""" from app.utils.timezone import beijing_now now = beijing_now() @@ -270,7 +398,6 @@ def _seconds_until(hour: int, minute: int) -> float: async def start_daily_report_scheduler(): - """日报定时调度主循环""" hour = settings.daily_report.send_hour minute = settings.daily_report.send_minute logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送") @@ -281,7 +408,6 @@ async def start_daily_report_scheduler(): logger.debug(f"日报下次发送倒计时 {wait:.0f} 秒") await asyncio.sleep(wait) await _send_daily_report() - # 发送完等 61 秒,避免同一分钟内重复触发 await asyncio.sleep(61) except asyncio.CancelledError: logger.info("日报定时任务已停止")