Files
iot-device-management-service/app/services/daily_report_service.py
16337 5309b5a7ce 重构:建立术语注册中心 constants.py(阶段一)
新建 app/constants.py 作为全局术语单一真相源,包含:
- AlarmType/AlarmStatus/HandleStatus/OrderStatus/CleaningType 枚举
- 所有中文映射字典(ALARM_TYPE_NAMES 等)
- 芋道前端兼容状态映射
- 告警等级、优先级、保洁类型等常量
- ORDER_OPEN_STATUSES 未完成状态集合

已替换 daily_report_service.py 和 order_query.py 中的重复定义。
其余文件(wechat_service/vlm_service/yudao_*等)待下一阶段替换。
2026-04-07 11:29:44 +08:00

496 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
每日工单日报定时推送服务
每天定时生成前一天的工单汇总,发送到企微群聊。
数据源IoT ops_order + 安保/保洁扩展表。
"""
import asyncio
from collections import Counter
from datetime import timedelta
from typing import Dict, List, Optional
from app.config import settings
from app.utils.logger import logger
from app.constants import ALARM_TYPE_NAMES, CLEANING_TYPE_NAMES, WEEKDAY_NAMES
def _format_duration(minutes: float) -> str:
if minutes <= 0:
return "暂无数据"
if minutes < 60:
return f"{minutes:.1f}分钟"
return f"{minutes / 60:.1f}小时"
def _format_age(minutes: int) -> str:
"""把分钟数格式化为人类友好的时长"""
if minutes < 60:
return f"{minutes}分钟"
hours = minutes / 60
if hours < 24:
return f"{hours:.1f}小时"
days = hours / 24
return f"{days:.1f}"
def _format_ratio(numerator: int, denominator: int) -> str:
if denominator <= 0:
return "0%"
return f"{(numerator / denominator) * 100:.1f}%"
def _format_change(current: int, previous: int) -> str:
if previous <= 0:
return "前日无工单"
change_pct = (current - previous) / previous * 100
if change_pct > 0:
return f"前日{previous}条,↑{change_pct:.1f}%"
if change_pct < 0:
return f"前日{previous}条,↓{abs(change_pct):.1f}%"
return f"前日{previous}条,持平"
def _top_summary(counter: Counter, mapping: Optional[Dict[str, str]] = None, top_n: int = 3, max_len: int = 0) -> str:
"""汇总 Counter 前 N 名。max_len > 0 时截断每个名称。"""
if not counter:
return "暂无数据"
parts = []
for key, count in counter.most_common(top_n):
name = mapping.get(key, key) if mapping else key
if max_len and len(name) > max_len:
name = name[:max_len] + ".."
parts.append(f"{name} {count}")
return "".join(parts)
def _location_name(order) -> str:
if getattr(order, "location", None):
return order.location
if getattr(order, "area_id", None):
return f"区域{order.area_id}"
return "未标注区域"
def _safe_avg(values: List[float]) -> float:
return sum(values) / len(values) if values else 0.0
async def _build_daily_report_data() -> Optional[Dict]:
from app.utils.timezone import beijing_now
now = beijing_now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
day_before_start = today_start - timedelta(days=2)
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
try:
from app.models_iot import (
get_iot_session,
IotOpsOrder,
IotOpsOrderSecurityExt,
IotOpsOrderCleanExt,
)
except Exception as e:
logger.error(f"IoT数据库不可用日报生成失败: {e}")
return None
db = get_iot_session()
try:
yesterday_orders = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= yesterday_start,
IotOpsOrder.create_time < today_start,
IotOpsOrder.deleted == 0,
).all()
yesterday_total = len(yesterday_orders)
prev_total = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= day_before_start,
IotOpsOrder.create_time < yesterday_start,
IotOpsOrder.deleted == 0,
).count()
current_open_orders = db.query(IotOpsOrder).filter(
IotOpsOrder.deleted == 0,
IotOpsOrder.status.notin_(("COMPLETED", "CANCELLED")),
).all()
if not yesterday_orders and not current_open_orders:
return {
"date_str": date_str,
"weekday": weekday,
"empty": True,
"title": f"物业工单日报 {date_str}{weekday}",
"subtitle": "昨日系统运行平稳",
"overview": "昨日无新增工单,当前无待处理工单。",
"summary": {
"yesterday_total": 0,
"completed_count": 0,
"backlog_count": 0,
"avg_resp": "暂无数据",
"avg_close": "暂无数据",
"false_alarm_rate": "0%",
},
"risk_lines": [
"安保高发:暂无数据",
"高发区域:暂无数据",
"高发摄像头:暂无数据",
"超时未处理0 条",
],
"change_str": "前日无工单",
"top_overdue": [],
}
order_ids = [o.id for o in yesterday_orders]
sec_ext_map = {}
clean_ext_map = {}
if order_ids:
sec_exts = db.query(IotOpsOrderSecurityExt).filter(
IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
IotOpsOrderSecurityExt.deleted == 0,
).all()
sec_ext_map = {e.ops_order_id: e for e in sec_exts}
clean_exts = db.query(IotOpsOrderCleanExt).filter(
IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
IotOpsOrderCleanExt.deleted == 0,
).all()
clean_ext_map = {e.ops_order_id: e for e in clean_exts}
except Exception as e:
logger.error(f"查询IoT工单失败: {e}", exc_info=True)
return None
finally:
db.close()
type_count = {"SECURITY": 0, "CLEAN": 0}
status_count = Counter()
alarm_type_count = Counter()
area_counter = Counter()
camera_code_counter = Counter()
false_alarm_count = 0
response_times: List[float] = []
close_times: List[float] = []
cleaning_type_count = Counter()
for order in yesterday_orders:
order_type = order.order_type or "SECURITY"
type_count[order_type] = type_count.get(order_type, 0) + 1
status_count[order.status or "PENDING"] += 1
area_counter[_location_name(order)] += 1
sec_ext = sec_ext_map.get(order.id)
if sec_ext:
if sec_ext.alarm_type:
alarm_type_count[sec_ext.alarm_type] += 1
cam_key = sec_ext.camera_id or sec_ext.camera_name
if cam_key:
camera_code_counter[cam_key] += 1
if sec_ext.false_alarm == 1:
false_alarm_count += 1
if sec_ext.dispatched_time and sec_ext.confirmed_time:
delta = (sec_ext.confirmed_time - sec_ext.dispatched_time).total_seconds() / 60.0
if 0 <= delta <= 360:
response_times.append(delta)
if sec_ext.dispatched_time and sec_ext.completed_time:
delta = (sec_ext.completed_time - sec_ext.dispatched_time).total_seconds() / 60.0
if 0 <= delta <= 24 * 60:
close_times.append(delta)
clean_ext = clean_ext_map.get(order.id)
if clean_ext:
if clean_ext.cleaning_type:
cleaning_type_count[clean_ext.cleaning_type] += 1
dispatch_time = clean_ext.first_dispatched_time or clean_ext.dispatched_time
if dispatch_time and clean_ext.arrived_time:
delta = (clean_ext.arrived_time - dispatch_time).total_seconds() / 60.0
if 0 <= delta <= 360:
response_times.append(delta)
if dispatch_time and clean_ext.completed_time:
delta = (clean_ext.completed_time - dispatch_time).total_seconds() / 60.0
if 0 <= delta <= 24 * 60:
close_times.append(delta)
camera_counter = Counter()
if camera_code_counter:
try:
from app.services.camera_name_service import get_camera_name_service
cam_svc = get_camera_name_service()
name_map = await cam_svc.get_display_names_batch(list(camera_code_counter.keys()))
for code, count in camera_code_counter.items():
display_name = name_map.get(code, code)
camera_counter[display_name] += count
except Exception as e:
logger.warning(f"摄像头名称解析失败,降级使用代码: {e}")
camera_counter = camera_code_counter
backlog_count = len(current_open_orders)
carry_over_count = sum(1 for order in current_open_orders if order.create_time and order.create_time < today_start)
completed_count = status_count.get("COMPLETED", 0)
avg_resp = _format_duration(_safe_avg(response_times))
avg_close = _format_duration(_safe_avg(close_times))
false_alarm_rate = _format_ratio(false_alarm_count, type_count.get("SECURITY", 0))
overdue_orders = sorted(
(
order for order in current_open_orders
if order.create_time and order.create_time < today_start
),
key=lambda item: item.create_time,
)
top_overdue = []
for order in overdue_orders[:3]:
age_minutes = max(int((now - order.create_time).total_seconds() / 60), 0)
order_type_label = "安保" if order.order_type == "SECURITY" else "保洁"
top_overdue.append(
f"{_location_name(order)}{order_type_label},已挂起{_format_age(age_minutes)}"
)
report = {
"date_str": date_str,
"weekday": weekday,
"empty": False,
"title": f"物业工单日报 {date_str}{weekday}",
"subtitle": "昨日运营概览",
"overview": f"昨日新增 {yesterday_total}|完成 {completed_count}|当前待处理 {backlog_count}",
"change_str": _format_change(yesterday_total, prev_total),
"summary": {
"yesterday_total": yesterday_total,
"completed_count": completed_count,
"backlog_count": backlog_count,
"security_count": type_count.get("SECURITY", 0),
"clean_count": type_count.get("CLEAN", 0),
"avg_resp": avg_resp,
"avg_close": avg_close,
"false_alarm_rate": false_alarm_rate,
"carry_over_count": carry_over_count,
"cancelled_count": status_count.get("CANCELLED", 0),
},
"risk_lines": [
f"安保高发:{_top_summary(alarm_type_count, ALARM_TYPE_NAMES)}",
f"高发区域:{_top_summary(area_counter)}",
f"高发摄像头:{_top_summary(camera_counter)}",
f"超时未处理:{carry_over_count}",
],
"tops": {
"alarm_types": _top_summary(alarm_type_count, ALARM_TYPE_NAMES),
"areas": _top_summary(area_counter),
"cameras": _top_summary(camera_counter),
"cleaning_types": _top_summary(cleaning_type_count, CLEANING_TYPE_NAMES),
# 卡片专用:截断名称,只取 top2 防溢出
"cameras_short": _top_summary(camera_counter, top_n=2, max_len=8),
"alarm_types_short": _top_summary(alarm_type_count, ALARM_TYPE_NAMES, top_n=2),
},
"top_overdue": top_overdue,
}
return report
def _build_template_card(report: Dict) -> Dict:
"""构建 text_notice 模板卡片(群机器人 Webhook 专用)"""
s = report["summary"]
tops = report["tops"]
click_url = settings.wechat.service_base_url or "https://work.weixin.qq.com"
# 大号数字
emphasis_desc = f"昨日新增({report['change_str']}"
# 待处理文案
if s["backlog_count"] == 0:
pending_val = "0 ✅ 全部清零"
elif s["carry_over_count"] > 0:
pending_val = f"{s['backlog_count']}(遗留{s['carry_over_count']}"
else:
pending_val = str(s["backlog_count"])
# 键值对(最多 6 条,用短名称防截断)
kv_list = [
{"keyname": "安保 / 保洁", "value": f"{s['security_count']} / {s['clean_count']}"},
{"keyname": "已完成 / 待处理", "value": f"{s['completed_count']} / {pending_val}"},
{"keyname": "首响 / 完结", "value": f"{s['avg_resp']} / {s['avg_close']}"},
]
if tops["alarm_types_short"] != "暂无数据":
kv_list.append({"keyname": "告警热点", "value": tops["alarm_types_short"]})
if tops["cameras_short"] != "暂无数据":
kv_list.append({"keyname": "高发设备", "value": tops["cameras_short"]})
if s["false_alarm_rate"] != "0%":
kv_list.append({"keyname": "误报率", "value": s["false_alarm_rate"]})
# 副标题(一句话总结)
if s["backlog_count"] == 0:
sub_title = "昨日工单全部清零,运营状态良好"
elif report["top_overdue"]:
sub_title = f"⚠ 需关注:{report['top_overdue'][0]}"
else:
sub_title = f"当前 {s['backlog_count']} 条待处理"
card = {
"card_type": "text_notice",
"source": {
"desc": "VSP物业平台",
"desc_color": 0,
},
"main_title": {
"title": report["title"],
},
"emphasis_content": {
"title": str(s["yesterday_total"]),
"desc": emphasis_desc,
},
"sub_title_text": sub_title,
"horizontal_content_list": kv_list,
"jump_list": [
{
"type": 2,
"title": "点击查看详情",
"appid": "wxb3dc42bb3017c3f2",
"pagepath": "pages/index/index",
},
],
"card_action": {
"type": 2,
"appid": "wxb3dc42bb3017c3f2",
"pagepath": "pages/index/index",
},
}
return card
def _build_markdown(report: Dict) -> str:
"""构建单条企微 markdown 日报(降级方案)"""
if report.get("empty"):
return (
f"**{report['title']}**\n\n"
f">昨日系统运行平稳,无新增工单\n"
f">当前无待处理事项"
)
s = report["summary"]
backlog = s["backlog_count"]
lines = [
f"**{report['title']}**",
"",
f">昨日新增 <font color=\"warning\">{s['yesterday_total']}</font> 条({report['change_str']}",
f">安保 {s['security_count']}|保洁 {s['clean_count']}"
f"已完成 {s['completed_count']}|误报 {s['false_alarm_rate']}",
]
if backlog == 0:
lines.append(f">待处理 <font color=\"info\">0</font> 条,全部清零")
else:
lines.append(
f">待处理 <font color=\"warning\">{backlog}</font> 条"
f"(其中遗留 {s['carry_over_count']}"
)
lines.append("")
lines.append(
f">响应效率:首响 <font color=\"info\">{s['avg_resp']}</font>"
f"|完结 <font color=\"info\">{s['avg_close']}</font>"
)
tops = report["tops"]
risk_items = []
if tops["alarm_types"] != "暂无数据":
risk_items.append(f">告警类型|{tops['alarm_types']}")
if tops["cleaning_types"] != "暂无数据":
risk_items.append(f">保洁类型|{tops['cleaning_types']}")
if tops["areas"] != "暂无数据":
risk_items.append(f">高发区域|{tops['areas']}")
if tops["cameras"] != "暂无数据":
risk_items.append(f">高发设备|{tops['cameras']}")
if risk_items:
lines.append("")
lines.append("**热点分布**")
lines.extend(risk_items)
if report["top_overdue"]:
lines.append("")
lines.append(f"**需关注({s['carry_over_count']}条超时)**")
for item in report["top_overdue"]:
lines.append(f">{item}")
return "\n".join(lines)
async def generate_daily_report() -> Optional[str]:
"""生成日报 markdown 内容(供预览和降级发送)"""
report = await _build_daily_report_data()
if not report:
return None
return _build_markdown(report)
async def _send_daily_report():
"""发送日报:优先用群机器人 Webhook 模板卡片,降级为 markdown"""
from app.services.wechat_service import get_wechat_service
chat_id = settings.wechat.group_chat_id
robot_key = settings.wechat.group_robot_key
if not chat_id and not robot_key:
logger.warning("日报发送跳过:未配置 group_chat_id 或 group_robot_key")
return
try:
report = await _build_daily_report_data()
if not report:
logger.info("日报生成内容为空,跳过发送")
return
wechat_svc = get_wechat_service()
# 优先:群机器人 Webhook 发送 text_notice 模板卡片
if robot_key:
card = _build_template_card(report)
ok = await wechat_svc.send_webhook_template_card(robot_key, card)
if ok:
logger.info("日报模板卡片已通过 Webhook 发送")
return
# 降级:应用群聊发送 markdown
if chat_id:
content = _build_markdown(report)
ok = await wechat_svc.send_group_markdown(chat_id, content)
if ok:
logger.info("日报已通过 markdown 发送到群聊")
else:
logger.error("日报发送失败")
except Exception:
logger.exception("日报生成或发送异常")
def _seconds_until(hour: int, minute: int) -> float:
from app.utils.timezone import beijing_now
now = beijing_now()
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
return (target - now).total_seconds()
async def start_daily_report_scheduler():
hour = settings.daily_report.send_hour
minute = settings.daily_report.send_minute
logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送")
try:
while True:
wait = _seconds_until(hour, minute)
logger.debug(f"日报下次发送倒计时 {wait:.0f}")
await asyncio.sleep(wait)
await _send_daily_report()
await asyncio.sleep(61)
except asyncio.CancelledError:
logger.info("日报定时任务已停止")