Files
iot-device-management-service/app/services/daily_report_service.py
16337 63a8d5a8f2 告警-工单解耦:企微交互+Agent全面切换到工单驱动
Part A: 数据层
- 新增 WechatCardState 模型(order_id ↔ alarm_id 映射 + response_code)
- 新建 models_iot.py(IoT 工单只读 ORM:ops_order + security_ext + clean_ext)
- config.py 新增 IOT_DATABASE_URL 配置

Part B: 企微解耦(alarm_id → order_id)
- wechat_service: response_code 存储迁移到 wechat_card_state,集中 helper
- 卡片发送/更新方法改用 order_id,按钮 key: confirm_{order_id}
- wechat_callback: 按钮解析改 order_id,反查 alarm_id(可空)
- wechat_notify_api: send-card/sync-status 以 orderId 为主键
- yudao_aiot_alarm: 卡片操作改用 order_id,删重复 helper

Part C: Agent 工具全面改为工单驱动
- 新建 order_query.py(查 IoT ops_order,支持安保+保洁工单)
- 新建 order_action.py(操作工单状态 + 提交处理结果)
- 更新 prompts.py 为工单助手
- 更新工具注册(__init__.py)

Part D: 日报改为工单驱动
- daily_report_service 从查 alarm_event 改为查 IoT ops_order + 扩展表
- 支持安保+保洁工单统计
2026-03-31 10:49:42 +08:00

274 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
每日工单日报定时推送服务
每天定时生成前一天的工单汇总,发送到企微群聊。
数据源IoT ops_order + 安保/保洁扩展表。
"""
import asyncio
from collections import Counter, defaultdict
from datetime import timedelta
from typing import Dict, List, Optional
from app.utils.logger import logger
from app.config import settings
# 告警类型中文映射
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
"illegal_parking": "车辆违停",
"vehicle_congestion": "车辆拥堵",
}
# 保洁类型映射
CLEANING_TYPE_NAMES = {
"ROUTINE": "日常保洁", "DEEP": "深度保洁",
"SPOT": "点状保洁", "EMERGENCY": "应急保洁",
}
# 工单状态映射
ORDER_STATUS_NAMES = {
"PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗",
"PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消",
}
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
def _format_resp_time(minutes: float) -> str:
"""格式化响应时长"""
if minutes < 60:
return f"{minutes:.1f}分钟"
return f"{minutes / 60:.1f}小时"
async def generate_daily_report() -> Optional[str]:
"""生成昨日工单日报 Markdown 内容"""
from app.utils.timezone import beijing_now
now = beijing_now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
day_before_start = today_start - timedelta(days=2)
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
# 查询 IoT 工单
try:
from app.models_iot import (
get_iot_session, IotOpsOrder,
IotOpsOrderSecurityExt, IotOpsOrderCleanExt,
)
except Exception as e:
logger.error(f"IoT数据库不可用日报生成失败: {e}")
return None
db = get_iot_session()
try:
# 昨日工单
yesterday_orders = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= yesterday_start,
IotOpsOrder.create_time < today_start,
IotOpsOrder.deleted == 0,
).all()
yesterday_total = len(yesterday_orders)
# 前日工单(用于环比)
prev_total = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= day_before_start,
IotOpsOrder.create_time < yesterday_start,
IotOpsOrder.deleted == 0,
).count()
if yesterday_total == 0:
return (
f"**物业工单日报 — {date_str}{weekday}**\n\n"
f">昨日工单总计:**0** 条\n"
f">系统运行正常,无工单"
)
# 收集 order_ids
order_ids = [o.id for o in yesterday_orders]
# 批量查安保扩展
sec_ext_map = {}
sec_exts = db.query(IotOpsOrderSecurityExt).filter(
IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
IotOpsOrderSecurityExt.deleted == 0,
).all()
sec_ext_map = {e.ops_order_id: e for e in sec_exts}
# 批量查保洁扩展
clean_ext_map = {}
clean_exts = db.query(IotOpsOrderCleanExt).filter(
IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
IotOpsOrderCleanExt.deleted == 0,
).all()
clean_ext_map = {e.ops_order_id: e for e in clean_exts}
except Exception as e:
logger.error(f"查询IoT工单失败: {e}", exc_info=True)
return None
finally:
db.close()
# ---- 统计 ----
type_count = {"SECURITY": 0, "CLEAN": 0}
status_count = Counter()
alarm_type_count = Counter()
camera_counter = Counter()
false_alarm_count = 0
response_times: List[float] = []
cleaning_type_count = Counter()
for o in yesterday_orders:
ot = o.order_type or "SECURITY"
type_count[ot] = type_count.get(ot, 0) + 1
status_count[o.status or "PENDING"] += 1
# 安保统计
sec_ext = sec_ext_map.get(o.id)
if sec_ext:
if sec_ext.alarm_type:
alarm_type_count[sec_ext.alarm_type] += 1
if sec_ext.camera_name:
camera_counter[sec_ext.camera_name] += 1
elif sec_ext.camera_id:
camera_counter[sec_ext.camera_id] += 1
if sec_ext.false_alarm == 1:
false_alarm_count += 1
# 响应时长dispatched → confirmed
if sec_ext.dispatched_time and sec_ext.confirmed_time:
delta = (sec_ext.confirmed_time - sec_ext.dispatched_time).total_seconds() / 60.0
if 0 <= delta <= 360:
response_times.append(delta)
# 保洁统计
clean_ext = clean_ext_map.get(o.id)
if clean_ext and clean_ext.cleaning_type:
cleaning_type_count[clean_ext.cleaning_type] += 1
# 环比
if prev_total > 0:
change_pct = (yesterday_total - prev_total) / prev_total * 100
if change_pct > 0:
change_str = f"前日{prev_total}条,↑{change_pct:.1f}%"
elif change_pct < 0:
change_str = f"前日{prev_total}条,↓{abs(change_pct):.1f}%"
else:
change_str = f"前日{prev_total}条,持平"
else:
change_str = "前日无工单"
# 平均响应时长
resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据"
# 待处理数量
pending_count = sum(
1 for o in yesterday_orders if o.status in ("PENDING", "ASSIGNED")
)
completed_count = status_count.get("COMPLETED", 0)
cancelled_count = status_count.get("CANCELLED", 0)
# ==================== 组装 Markdown ====================
lines = [
f"**物业工单日报 — {date_str}{weekday}**",
"",
f">昨日工单总计:<font color=\"warning\">{yesterday_total}</font> 条({change_str}",
]
# 按工单类型
sec_count = type_count.get("SECURITY", 0)
clean_count = type_count.get("CLEAN", 0)
if sec_count and clean_count:
lines.append(f">安保工单:{sec_count}条 | 保洁工单:{clean_count}")
elif sec_count:
lines.append(f">安保工单:{sec_count}")
elif clean_count:
lines.append(f">保洁工单:{clean_count}")
lines.append(f">待处理:<font color=\"warning\">{pending_count}</font> 条 | "
f"已完成:{completed_count}条 | 已取消:{cancelled_count}条 | 误报:{false_alarm_count}")
lines.append(f">平均响应:<font color=\"info\">{resp_str}</font>")
# 安保告警类型分布
if alarm_type_count:
lines.append("")
lines.append("**安保告警类型分布**")
for alarm_type, count in alarm_type_count.most_common():
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
lines.append(f">{type_name}{count}")
# 保洁类型分布
if cleaning_type_count:
lines.append("")
lines.append("**保洁类型分布**")
for ct, count in cleaning_type_count.most_common():
ct_name = CLEANING_TYPE_NAMES.get(ct, ct)
lines.append(f">{ct_name}{count}")
# 摄像头 Top5
top5_cameras = camera_counter.most_common(5)
if top5_cameras:
lines.append("")
lines.append("**告警摄像头 Top5**")
for i, (cam_name, count) in enumerate(top5_cameras, 1):
lines.append(f">{i}. {cam_name}{count}")
return "\n".join(lines)
async def _send_daily_report():
"""生成并发送日报"""
from app.services.wechat_service import get_wechat_service
chat_id = settings.wechat.group_chat_id
if not chat_id:
logger.warning("日报发送跳过:未配置 group_chat_id")
return
try:
content = await generate_daily_report()
if not content:
logger.info("日报生成内容为空,跳过发送")
return
wechat_svc = get_wechat_service()
ok = await wechat_svc.send_group_markdown(chat_id, content)
if ok:
logger.info("日报已发送到企微群聊")
else:
logger.error("日报发送失败")
except Exception:
logger.exception("日报生成或发送异常")
def _seconds_until(hour: int, minute: int) -> float:
"""计算距离下一个 HH:MM 的秒数"""
from app.utils.timezone import beijing_now
now = beijing_now()
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
return (target - now).total_seconds()
async def start_daily_report_scheduler():
"""日报定时调度主循环"""
hour = settings.daily_report.send_hour
minute = settings.daily_report.send_minute
logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送")
try:
while True:
wait = _seconds_until(hour, minute)
logger.debug(f"日报下次发送倒计时 {wait:.0f}")
await asyncio.sleep(wait)
await _send_daily_report()
# 发送完等 61 秒,避免同一分钟内重复触发
await asyncio.sleep(61)
except asyncio.CancelledError:
logger.info("日报定时任务已停止")