Files
iot-device-management-service/app/services/agent/tools/order_query.py
16337 5309b5a7ce 重构:建立术语注册中心 constants.py(阶段一)
新建 app/constants.py 作为全局术语单一真相源,包含:
- AlarmType/AlarmStatus/HandleStatus/OrderStatus/CleaningType 枚举
- 所有中文映射字典(ALARM_TYPE_NAMES 等)
- 芋道前端兼容状态映射
- 告警等级、优先级、保洁类型等常量
- ORDER_OPEN_STATUSES 未完成状态集合

已替换 daily_report_service.py 和 order_query.py 中的重复定义。
其余文件(wechat_service/vlm_service/yudao_*等)待下一阶段替换。
2026-04-07 11:29:44 +08:00

342 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
工单查询工具:统计、列表、详情(查 IoT ops_order + 扩展表)
支持安保工单SECURITY和保洁工单CLEAN
"""
import json
from datetime import timedelta
from typing import Optional
from langchain_core.tools import tool
from langchain_core.runnables import RunnableConfig
from app.utils.logger import logger
from app.utils.timezone import beijing_now
from app.constants import (
ALARM_TYPE_NAMES, ORDER_STATUS_NAMES, PRIORITY_NAMES,
CLEANING_TYPE_NAMES, ORDER_OPEN_STATUSES,
)
def _parse_time_range(time_range: str):
"""解析时间范围,返回 (start_time, label)"""
now = beijing_now()
if time_range == "week":
start = now - timedelta(days=now.weekday())
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
return start, "本周"
elif time_range == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
return start, "本月"
elif time_range == "yesterday":
start = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
end = now.replace(hour=0, minute=0, second=0, microsecond=0)
return start, "昨日"
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
return start, "今日"
def _get_camera_display_name(device_id: str) -> str:
"""同步获取摄像头显示名称"""
try:
import asyncio
from app.services.camera_name_service import get_camera_name_service
camera_service = get_camera_name_service()
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
cam_info = pool.submit(
asyncio.run, camera_service.get_camera_info(device_id)
).result(timeout=5)
else:
cam_info = asyncio.run(camera_service.get_camera_info(device_id))
return camera_service.format_display_name(device_id, cam_info)
except Exception:
return device_id
def _query_orders(
order_type: Optional[str] = None,
status: Optional[str] = None,
start_time=None,
end_time=None,
limit: int = 100,
assignee_name: Optional[str] = None,
skip_time_filter: bool = False,
):
"""查询 IoT 工单(跨库只读)"""
from app.models_iot import get_iot_session, IotOpsOrder
from app.config import settings
db = get_iot_session()
try:
q = db.query(IotOpsOrder).filter(IotOpsOrder.deleted == 0)
# 租户隔离
try:
tid = int(settings.work_order.tenant_id)
q = q.filter(IotOpsOrder.tenant_id == tid)
except Exception:
pass
if order_type and order_type != "ALL":
q = q.filter(IotOpsOrder.order_type == order_type)
if status:
q = q.filter(IotOpsOrder.status == status)
if not skip_time_filter:
if start_time:
q = q.filter(IotOpsOrder.create_time >= start_time)
if end_time:
q = q.filter(IotOpsOrder.create_time < end_time)
if assignee_name:
q = q.filter(IotOpsOrder.assignee_name.contains(assignee_name))
total = q.count()
orders = q.order_by(IotOpsOrder.create_time.desc()).limit(limit).all()
# 提取所有 order_id 用于关联查询扩展表
order_ids = [o.id for o in orders]
# 批量查安保扩展
sec_ext_map = {}
if order_ids:
from app.models_iot import IotOpsOrderSecurityExt
sec_exts = db.query(IotOpsOrderSecurityExt).filter(
IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
IotOpsOrderSecurityExt.deleted == 0,
).all()
sec_ext_map = {e.ops_order_id: e for e in sec_exts}
# 批量查保洁扩展
clean_ext_map = {}
if order_ids:
from app.models_iot import IotOpsOrderCleanExt
clean_exts = db.query(IotOpsOrderCleanExt).filter(
IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
IotOpsOrderCleanExt.deleted == 0,
).all()
clean_ext_map = {e.ops_order_id: e for e in clean_exts}
return orders, total, sec_ext_map, clean_ext_map
except Exception as e:
logger.error(f"查询IoT工单失败: {e}", exc_info=True)
return [], 0, {}, {}
finally:
db.close()
@tool
def query_order_stats(time_range: str = "today", order_type: str = "ALL") -> str:
"""查询工单统计数据(总数、按状态分布、按类型分布)
Args:
time_range: 时间范围 today=今日 week=本周 month=本月 yesterday=昨日
order_type: 工单类型筛选 SECURITY=安保 CLEAN=保洁 ALL=全部
"""
start, range_label = _parse_time_range(time_range)
now = beijing_now()
end = now
if time_range == "yesterday":
end = now.replace(hour=0, minute=0, second=0, microsecond=0)
orders, total, sec_ext_map, clean_ext_map = _query_orders(
order_type=order_type if order_type != "ALL" else None,
start_time=start, end_time=end, limit=10000,
)
# 额外统计当前所有未完成工单(不限时间范围)
pending_orders, pending_total, _, _ = _query_orders(
order_type=order_type if order_type != "ALL" else None,
status="PENDING", limit=10000, skip_time_filter=True,
)
assigned_orders, assigned_total, _, _ = _query_orders(
order_type=order_type if order_type != "ALL" else None,
status="ASSIGNED", limit=10000, skip_time_filter=True,
)
# 按状态统计
status_count = {}
for o in orders:
s = o.status or "PENDING"
status_count[s] = status_count.get(s, 0) + 1
# 按类型统计
type_count = {"SECURITY": 0, "CLEAN": 0}
alarm_type_count = {}
for o in orders:
ot = o.order_type or "SECURITY"
type_count[ot] = type_count.get(ot, 0) + 1
# 安保工单细分告警类型
sec_ext = sec_ext_map.get(o.id)
if sec_ext and sec_ext.alarm_type:
alarm_type_count[sec_ext.alarm_type] = alarm_type_count.get(sec_ext.alarm_type, 0) + 1
# 误报统计(安保特有)
false_alarm_count = sum(
1 for e in sec_ext_map.values() if e.false_alarm == 1
)
result = {
"range": range_label,
"total": total,
"by_status": {ORDER_STATUS_NAMES.get(s, s): c for s, c in status_count.items()},
"by_order_type": {k: v for k, v in type_count.items() if v > 0},
"by_alarm_type": {ALARM_TYPE_NAMES.get(t, t): c for t, c in alarm_type_count.items()} if alarm_type_count else {},
"false_alarm_count": false_alarm_count,
"current_pending": pending_total,
"current_assigned": assigned_total,
}
return json.dumps(result, ensure_ascii=False)
@tool
def list_orders(
time_range: str = "today",
order_type: str = "ALL",
status: str = "",
limit: int = 10,
assigned_to_me: bool = False,
config: RunnableConfig = None,
) -> str:
"""查询工单列表,返回最近的工单记录
Args:
time_range: 时间范围 today/week/month/yesterday
order_type: 工单类型 SECURITY=安保 CLEAN=保洁 ALL=全部
status: 状态筛选 PENDING/ASSIGNED/ARRIVED/PAUSED/COMPLETED/CANCELLED
limit: 返回条数默认10最多20
assigned_to_me: 是否只看我的工单
"""
start, range_label = _parse_time_range(time_range)
now = beijing_now()
end = now
if time_range == "yesterday":
end = now.replace(hour=0, minute=0, second=0, microsecond=0)
limit = min(limit, 20)
# 获取当前用户
user_id = ""
if config and assigned_to_me:
user_id = config.get("configurable", {}).get("user_id", "")
# 查待处理工单时不限时间范围(待处理可能是之前创建的)
skip_time = status and status in ORDER_OPEN_STATUSES
orders, total, sec_ext_map, clean_ext_map = _query_orders(
order_type=order_type if order_type != "ALL" else None,
status=status or None,
start_time=start, end_time=end, limit=limit,
skip_time_filter=skip_time,
)
items = []
for o in orders:
create_time = ""
if o.create_time:
try:
create_time = o.create_time.strftime("%m-%d %H:%M")
except Exception:
create_time = str(o.create_time)[:16]
item = {
"order_id": str(o.id),
"order_code": o.order_code or "",
"type": o.order_type or "SECURITY",
"title": o.title or "",
"status": ORDER_STATUS_NAMES.get(o.status, o.status or "待处理"),
"priority": PRIORITY_NAMES.get(o.priority, ""),
"assignee": o.assignee_name or "",
"time": create_time,
}
# 安保扩展
sec_ext = sec_ext_map.get(o.id)
if sec_ext:
item["alarm_type"] = ALARM_TYPE_NAMES.get(sec_ext.alarm_type, sec_ext.alarm_type or "")
item["camera"] = sec_ext.camera_name or _get_camera_display_name(sec_ext.camera_id) if sec_ext.camera_id else ""
item["false_alarm"] = bool(sec_ext.false_alarm)
# 保洁扩展
clean_ext = clean_ext_map.get(o.id)
if clean_ext:
item["cleaning_type"] = CLEANING_TYPE_NAMES.get(clean_ext.cleaning_type, clean_ext.cleaning_type or "")
item["difficulty"] = clean_ext.difficulty_level
items.append(item)
result = {"range": range_label, "total": total, "items": items}
return json.dumps(result, ensure_ascii=False)
@tool
def get_order_detail(order_id: str, config: RunnableConfig) -> str:
"""查询单条工单的详细信息
Args:
order_id: 工单IDops_order.id
"""
from app.models_iot import get_iot_session, IotOpsOrder, IotOpsOrderSecurityExt, IotOpsOrderCleanExt
db = get_iot_session()
try:
order = db.query(IotOpsOrder).filter(
IotOpsOrder.id == int(order_id),
IotOpsOrder.deleted == 0,
).first()
if not order:
return json.dumps({"error": f"未找到工单: {order_id}"}, ensure_ascii=False)
result = {
"order_id": str(order.id),
"order_code": order.order_code or "",
"order_type": order.order_type or "",
"title": order.title or "",
"description": order.description or "",
"status": ORDER_STATUS_NAMES.get(order.status, order.status or ""),
"priority": PRIORITY_NAMES.get(order.priority, ""),
"assignee": order.assignee_name or "",
"location": order.location or "",
"create_time": order.create_time.strftime("%Y-%m-%d %H:%M:%S") if order.create_time else "",
}
# 安保扩展
sec_ext = db.query(IotOpsOrderSecurityExt).filter(
IotOpsOrderSecurityExt.ops_order_id == order.id,
IotOpsOrderSecurityExt.deleted == 0,
).first()
if sec_ext:
result["alarm_id"] = sec_ext.alarm_id or ""
result["alarm_type"] = ALARM_TYPE_NAMES.get(sec_ext.alarm_type, sec_ext.alarm_type or "")
result["camera_name"] = sec_ext.camera_name or (
_get_camera_display_name(sec_ext.camera_id) if sec_ext.camera_id else ""
)
result["has_image"] = bool(sec_ext.image_url)
result["false_alarm"] = bool(sec_ext.false_alarm)
result["result"] = sec_ext.result or ""
if sec_ext.dispatched_time:
result["dispatched_time"] = sec_ext.dispatched_time.strftime("%Y-%m-%d %H:%M:%S")
if sec_ext.confirmed_time:
result["confirmed_time"] = sec_ext.confirmed_time.strftime("%Y-%m-%d %H:%M:%S")
if sec_ext.completed_time:
result["completed_time"] = sec_ext.completed_time.strftime("%Y-%m-%d %H:%M:%S")
# 保洁扩展
clean_ext = db.query(IotOpsOrderCleanExt).filter(
IotOpsOrderCleanExt.ops_order_id == order.id,
IotOpsOrderCleanExt.deleted == 0,
).first()
if clean_ext:
result["cleaning_type"] = CLEANING_TYPE_NAMES.get(clean_ext.cleaning_type, clean_ext.cleaning_type or "")
result["difficulty"] = clean_ext.difficulty_level
result["expected_duration"] = clean_ext.expected_duration
result["is_auto"] = bool(clean_ext.is_auto)
if clean_ext.arrived_time:
result["arrived_time"] = clean_ext.arrived_time.strftime("%Y-%m-%d %H:%M:%S")
if clean_ext.completed_time:
result["clean_completed_time"] = clean_ext.completed_time.strftime("%Y-%m-%d %H:%M:%S")
return json.dumps(result, ensure_ascii=False)
except Exception as e:
logger.error(f"查询工单详情失败: {e}", exc_info=True)
return json.dumps({"error": f"查询失败: {e}"}, ensure_ascii=False)
finally:
db.close()