From 63a8d5a8f260f3b38467d3e8cb2604785fbbe4ed Mon Sep 17 00:00:00 2001
From: 16337 <1633794139@qq.com>
Date: Tue, 31 Mar 2026 10:49:42 +0800
Subject: [PATCH] =?UTF-8?q?=E5=91=8A=E8=AD=A6-=E5=B7=A5=E5=8D=95=E8=A7=A3?=
=?UTF-8?q?=E8=80=A6=EF=BC=9A=E4=BC=81=E5=BE=AE=E4=BA=A4=E4=BA=92+Agent?=
=?UTF-8?q?=E5=85=A8=E9=9D=A2=E5=88=87=E6=8D=A2=E5=88=B0=E5=B7=A5=E5=8D=95?=
=?UTF-8?q?=E9=A9=B1=E5=8A=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Part A: 数据层
- 新增 WechatCardState 模型(order_id ↔ alarm_id 映射 + response_code)
- 新建 models_iot.py(IoT 工单只读 ORM:ops_order + security_ext + clean_ext)
- config.py 新增 IOT_DATABASE_URL 配置
Part B: 企微解耦(alarm_id → order_id)
- wechat_service: response_code 存储迁移到 wechat_card_state,集中 helper
- 卡片发送/更新方法改用 order_id,按钮 key: confirm_{order_id}
- wechat_callback: 按钮解析改 order_id,反查 alarm_id(可空)
- wechat_notify_api: send-card/sync-status 以 orderId 为主键
- yudao_aiot_alarm: 卡片操作改用 order_id,删重复 helper
Part C: Agent 工具全面改为工单驱动
- 新建 order_query.py(查 IoT ops_order,支持安保+保洁工单)
- 新建 order_action.py(操作工单状态 + 提交处理结果)
- 更新 prompts.py 为工单助手
- 更新工具注册(__init__.py)
Part D: 日报改为工单驱动
- daily_report_service 从查 alarm_event 改为查 IoT ops_order + 扩展表
- 支持安保+保洁工单统计
---
app/config.py | 2 +
app/models.py | 12 +
app/models_iot.py | 138 ++++++++++
app/routers/wechat_callback.py | 120 +++-----
app/routers/wechat_notify_api.py | 86 +++---
app/routers/yudao_aiot_alarm.py | 40 +--
app/services/agent/prompts.py | 22 +-
app/services/agent/tools/__init__.py | 14 +-
app/services/agent/tools/camera_tools.py | 2 +-
app/services/agent/tools/order_action.py | 182 ++++++++++++
app/services/agent/tools/order_query.py | 334 +++++++++++++++++++++++
app/services/daily_report_service.py | 286 +++++++++----------
app/services/notify_dispatch.py | 13 +-
app/services/wechat_service.py | 111 +++++---
14 files changed, 1019 insertions(+), 343 deletions(-)
create mode 100644 app/models_iot.py
create mode 100644 app/services/agent/tools/order_action.py
create mode 100644 app/services/agent/tools/order_query.py
diff --git a/app/config.py b/app/config.py
index f0d0c92..10e2613 100644
--- a/app/config.py
+++ b/app/config.py
@@ -125,6 +125,7 @@ class CameraNameConfig:
class Settings(BaseModel):
"""全局配置"""
database: DatabaseConfig = DatabaseConfig()
+ iot_database_url: str = "" # IoT 平台数据库(跨库只读查询)
cos: COSConfig = COSConfig()
app: AppConfig = AppConfig()
ai_model: AIModelConfig = AIModelConfig()
@@ -147,6 +148,7 @@ def load_settings() -> Settings:
database=DatabaseConfig(
url=os.getenv("DATABASE_URL", "sqlite:///./data/alert_platform.db"),
),
+ iot_database_url=os.getenv("IOT_DATABASE_URL", ""),
cos=COSConfig(
region=os.getenv("COS_REGION", "ap-beijing"),
bucket=os.getenv("COS_BUCKET", ""),
diff --git a/app/models.py b/app/models.py
index a7b1c15..3a31708 100644
--- a/app/models.py
+++ b/app/models.py
@@ -568,6 +568,18 @@ class CameraAreaBinding(Base):
created_at = Column(DateTime, default=beijing_now)
+class WechatCardState(Base):
+ """企微卡片状态表(order_id ↔ alarm_id 映射 + response_code)"""
+ __tablename__ = "wechat_card_state"
+
+ order_id = Column(String(64), primary_key=True, comment="IoT工单ID(ops_order.id)")
+ response_code = Column(String(255), nullable=True, comment="企微卡片 response_code")
+ alarm_id = Column(String(64), nullable=True, comment="关联告警ID(可空)")
+ created_at = Column(DateTime, default=lambda: beijing_now())
+ updated_at = Column(DateTime, default=lambda: beijing_now(),
+ onupdate=lambda: beijing_now())
+
+
class AreaPersonBinding(Base):
"""区域-人员通知绑定"""
__tablename__ = "area_person_binding"
diff --git a/app/models_iot.py b/app/models_iot.py
new file mode 100644
index 0000000..b84f094
--- /dev/null
+++ b/app/models_iot.py
@@ -0,0 +1,138 @@
+"""
+IoT 工单只读 ORM 模型
+
+映射 aiot-platform 库的工单表,vsp-service 通过第二个 SQLAlchemy 引擎跨库只读查询。
+"""
+
+from datetime import datetime
+from typing import Optional
+
+from sqlalchemy import (
+ Column, String, Integer, SmallInteger, BigInteger, DateTime, Text,
+ create_engine, Index,
+)
+from sqlalchemy.orm import declarative_base, sessionmaker
+
+from app.config import settings
+from app.utils.logger import logger
+
+IotBase = declarative_base()
+
+
+class IotOpsOrder(IotBase):
+ """IoT 通用工单主表(只读)"""
+ __tablename__ = "ops_order"
+
+ id = Column(BigInteger, primary_key=True)
+ order_code = Column(String(64), comment="工单编号")
+ order_type = Column(String(32), comment="工单类型: SECURITY/CLEAN")
+ title = Column(String(200), comment="工单标题")
+ description = Column(Text, comment="工单描述")
+ priority = Column(SmallInteger, comment="优先级: 0低/1中/2高")
+ status = Column(String(32), comment="状态: PENDING/ASSIGNED/ARRIVED/PAUSED/COMPLETED/CANCELLED")
+ area_id = Column(BigInteger, comment="区域ID")
+ location = Column(String(200), comment="位置")
+ assignee_id = Column(BigInteger, comment="指派人ID")
+ assignee_name = Column(String(100), comment="指派人姓名")
+ start_time = Column(DateTime, comment="开始时间")
+ end_time = Column(DateTime, comment="结束时间")
+ creator = Column(String(64), comment="创建者")
+ create_time = Column(DateTime, comment="创建时间")
+ update_time = Column(DateTime, comment="更新时间")
+ deleted = Column(SmallInteger, default=0, comment="是否删除")
+ tenant_id = Column(BigInteger, default=0, comment="租户编号")
+
+
+class IotOpsOrderSecurityExt(IotBase):
+ """安保工单扩展表(只读)"""
+ __tablename__ = "ops_order_security_ext"
+
+ id = Column(BigInteger, primary_key=True)
+ ops_order_id = Column(BigInteger, nullable=False, comment="工单ID")
+ alarm_id = Column(String(64), comment="关联告警ID")
+ alarm_type = Column(String(32), comment="告警类型")
+ camera_id = Column(String(64), comment="摄像头ID")
+ camera_name = Column(String(128), comment="摄像头名称")
+ roi_id = Column(String(64), comment="ROI区域ID")
+ image_url = Column(String(512), comment="截图URL")
+ assigned_user_id = Column(BigInteger, comment="处理人ID")
+ assigned_user_name = Column(String(100), comment="处理人姓名")
+ result = Column(Text, comment="处理结果")
+ result_img_urls = Column(Text, comment="结果图片URL")
+ false_alarm = Column(SmallInteger, comment="是否误报")
+ dispatched_time = Column(DateTime, comment="派单时间")
+ confirmed_time = Column(DateTime, comment="确认时间")
+ completed_time = Column(DateTime, comment="完成时间")
+ creator = Column(String(64))
+ create_time = Column(DateTime)
+ updater = Column(String(64))
+ update_time = Column(DateTime)
+ deleted = Column(SmallInteger, default=0)
+ tenant_id = Column(BigInteger, default=0)
+
+
+class IotOpsOrderCleanExt(IotBase):
+ """保洁工单扩展表(只读)"""
+ __tablename__ = "ops_order_clean_ext"
+
+ id = Column(BigInteger, primary_key=True)
+ ops_order_id = Column(BigInteger, nullable=False, comment="工单ID")
+ is_auto = Column(SmallInteger, default=1, comment="是否自动工单")
+ expected_duration = Column(Integer, comment="预计作业时长(分钟)")
+ arrived_time = Column(DateTime, comment="实际到岗时间")
+ completed_time = Column(DateTime, comment="实际完成时间")
+ pause_start_time = Column(DateTime, comment="暂停开始时间")
+ pause_end_time = Column(DateTime, comment="暂停结束时间")
+ total_pause_seconds = Column(Integer, default=0, comment="累计暂停时长(秒)")
+ cleaning_type = Column(String(32), comment="保洁类型: ROUTINE/DEEP/SPOT/EMERGENCY")
+ difficulty_level = Column(SmallInteger, comment="难度等级(1-5)")
+ creator = Column(String(64))
+ create_time = Column(DateTime)
+ updater = Column(String(64))
+ update_time = Column(DateTime)
+ deleted = Column(SmallInteger, default=0)
+ tenant_id = Column(BigInteger, default=0)
+ dispatched_time = Column(DateTime, comment="实际下发时间")
+ first_dispatched_time = Column(DateTime, comment="首次下发时间")
+
+
+# ==================== IoT 数据库连接管理 ====================
+
+_iot_engine = None
+_IotSessionLocal = None
+
+
+def get_iot_engine():
+ """获取 IoT 数据库引擎(懒初始化)"""
+ global _iot_engine
+ if _iot_engine is None:
+ iot_url = settings.iot_database_url
+ if not iot_url:
+ raise RuntimeError("IOT_DATABASE_URL 未配置,无法连接 IoT 数据库")
+ _iot_engine = create_engine(
+ iot_url,
+ echo=False,
+ pool_recycle=1800,
+ pool_pre_ping=True,
+ )
+ logger.info(f"IoT 数据库引擎已创建")
+ return _iot_engine
+
+
+def get_iot_session():
+ """获取 IoT 数据库只读 session"""
+ global _IotSessionLocal
+ if _IotSessionLocal is None:
+ _IotSessionLocal = sessionmaker(
+ bind=get_iot_engine(), autocommit=False, autoflush=False,
+ )
+ return _IotSessionLocal()
+
+
+def close_iot_db():
+ """关闭 IoT 数据库连接"""
+ global _iot_engine, _IotSessionLocal
+ if _iot_engine:
+ _iot_engine.dispose()
+ _iot_engine = None
+ _IotSessionLocal = None
diff --git a/app/routers/wechat_callback.py b/app/routers/wechat_callback.py
index fc45191..7957118 100644
--- a/app/routers/wechat_callback.py
+++ b/app/routers/wechat_callback.py
@@ -123,16 +123,18 @@ async def _process_card_button_click(msg: dict):
"""
处理模板卡片按钮点击事件(两步状态机)
+ 按钮 key 格式为 {action}_{order_id},以工单ID为主键驱动。
+
第一步按钮:
- - confirm_{alarm_id}: 确认接单 → 更新状态为处理中,创建工单,卡片更新到第二步
- - ignore_{alarm_id}: 误报忽略 → 终态,自动结单
+ - confirm_{order_id}: 确认接单 → 调 IoT /confirm,卡片更新到第二步
+ - ignore_{order_id}: 误报忽略 → 调 IoT /false-alarm,终态
第二步按钮:
- - complete_{alarm_id}: 已处理完成 → 终态,自动结单
- - false_{alarm_id}: 标记误报 → 终态,自动结单
+ - complete_{order_id}: 已处理完成 → 调 IoT /submit,终态
+ - false_{order_id}: 标记误报 → 调 IoT /false-alarm,终态
终态按钮:
- - done_{alarm_id}: 已完成,忽略
+ - done_{order_id}: 已完成,忽略
"""
try:
from app.services.wechat_service import get_wechat_service
@@ -147,19 +149,19 @@ async def _process_card_button_click(msg: dict):
f"卡片按钮点击: user={user_id}, key={event_key}, task={task_id}"
)
- # 解析 action 和 alarm_id
+ # 解析 action 和 order_id
if event_key.startswith("confirm_"):
action = "confirm"
- alarm_id = event_key[len("confirm_"):]
+ order_id = event_key[len("confirm_"):]
elif event_key.startswith("ignore_"):
action = "ignore"
- alarm_id = event_key[len("ignore_"):]
+ order_id = event_key[len("ignore_"):]
elif event_key.startswith("complete_"):
action = "complete"
- alarm_id = event_key[len("complete_"):]
+ order_id = event_key[len("complete_"):]
elif event_key.startswith("false_"):
action = "false"
- alarm_id = event_key[len("false_"):]
+ order_id = event_key[len("false_"):]
elif event_key.startswith("done_"):
return # 终态按钮,忽略
else:
@@ -169,131 +171,99 @@ async def _process_card_button_click(msg: dict):
wechat = get_wechat_service()
wo_client = get_work_order_client()
- # 保存 response_code 供后续 IoT 回调时更新卡片
+ # 保存 response_code(以 order_id 为 key)
if response_code:
- wechat.save_response_code(alarm_id, response_code)
+ alarm_id = wechat.get_alarm_id_for_order(order_id)
+ wechat.save_response_code(order_id, response_code, alarm_id=alarm_id)
- # 获取关联的工单ID
- order_id = _get_order_id_for_alarm(alarm_id)
+ # 反查 alarm_id(可空,手动工单没有关联告警)
+ alarm_id = wechat.get_alarm_id_for_order(order_id)
# ---- 确认接单:调 IoT /confirm ----
if action == "confirm":
- if order_id and wo_client.enabled:
+ if wo_client.enabled:
success = await wo_client.confirm_order(order_id)
if success:
- logger.info(f"IoT工单已确认,等待sync-status回调: alarm={alarm_id}, order={order_id}, by={user_id}")
+ logger.info(f"IoT工单已确认,等待sync-status回调: order={order_id}, by={user_id}")
return # IoT 回调 sync-status 会更新告警+卡片
- # IoT 调用失败时降级直接更新
- logger.warning(f"IoT工单确认失败,降级处理: alarm={alarm_id}, order={order_id}")
- service = get_alarm_event_service()
- service.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED",
- handle_status="HANDLING", handler=user_id, remark="企微确认(IoT降级)")
- else:
- # 工单未启用时直接更新告警状态
+ logger.warning(f"IoT工单确认失败: order={order_id}")
+
+ # 有告警时更新本地告警状态
+ if alarm_id:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED",
handle_status="HANDLING", handler=user_id, remark="企微确认接单")
- # 降级时才更新卡片
+ # 更新卡片
if response_code:
await wechat.update_alarm_card_step2(
response_code=response_code,
user_ids=[user_id],
- alarm_id=alarm_id,
+ order_id=order_id,
operator_name=user_id,
)
# ---- 误报忽略:调 IoT /false-alarm ----
elif action == "ignore":
- if order_id and wo_client.enabled:
+ if wo_client.enabled:
success = await wo_client.false_alarm(order_id)
if success:
- logger.info(f"IoT工单已标记误报,等待sync-status回调: alarm={alarm_id}, order={order_id}, by={user_id}")
- return # IoT 回调 sync-status 会更新告警+卡片
- logger.warning(f"IoT误报标记失败,降级处理: alarm={alarm_id}, order={order_id}")
- service = get_alarm_event_service()
- service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
- handle_status="IGNORED", handler=user_id, remark="企微误报(IoT降级)")
- else:
+ logger.info(f"IoT工单已标记误报,等待sync-status回调: order={order_id}, by={user_id}")
+ return
+
+ if alarm_id:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微误报忽略")
- # 降级时才更新卡片
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
- alarm_id=alarm_id, action="ignore", operator_name=user_id,
+ order_id=order_id, action="ignore", operator_name=user_id,
)
- # ---- 已处理完成(step2按钮,保留兼容):调 IoT /submit ----
+ # ---- 已处理完成(step2按钮):调 IoT /submit ----
elif action == "complete":
- if order_id and wo_client.enabled:
+ if wo_client.enabled:
success = await wo_client.submit_order(order_id, result=f"已处理 by {user_id}")
if success:
- logger.info(f"IoT工单已提交,等待sync-status回调: alarm={alarm_id}, order={order_id}")
- return # IoT 回调 sync-status 会更新告警+卡片
- service = get_alarm_event_service()
- service.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
- handle_status="DONE", handler=user_id, remark="企微完成(IoT降级)")
- else:
+ logger.info(f"IoT工单已提交,等待sync-status回调: order={order_id}")
+ return
+
+ if alarm_id:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
handle_status="DONE", handler=user_id, remark="企微已处理")
- # 降级时才更新卡片
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
- alarm_id=alarm_id, action="complete", operator_name=user_id,
+ order_id=order_id, action="complete", operator_name=user_id,
)
- # ---- 标记误报(step2按钮,保留兼容)----
+ # ---- 标记误报(step2按钮)----
elif action == "false":
- if order_id and wo_client.enabled:
+ if wo_client.enabled:
success = await wo_client.false_alarm(order_id)
if success:
- logger.info(f"IoT工单已标记误报,等待sync-status回调: alarm={alarm_id}, order={order_id}")
- return # IoT 回调 sync-status 会更新告警+卡片
- service = get_alarm_event_service()
- service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
- handle_status="IGNORED", handler=user_id, remark="企微误报(IoT降级)")
- else:
+ logger.info(f"IoT工单已标记误报,等待sync-status回调: order={order_id}")
+ return
+
+ if alarm_id:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微标记误报")
- # 降级时才更新卡片
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
- alarm_id=alarm_id, action="false", operator_name=user_id,
+ order_id=order_id, action="false", operator_name=user_id,
)
except Exception as e:
logger.error(f"处理卡片按钮点击失败: {e}", exc_info=True)
-def _get_order_id_for_alarm(alarm_id: str) -> str:
- """从 alarm_event_ext 中获取关联的工单ID"""
- from app.models import get_session, AlarmEventExt
-
- db = get_session()
- try:
- ext = db.query(AlarmEventExt).filter(
- AlarmEventExt.alarm_id == alarm_id,
- AlarmEventExt.ext_type == "WORK_ORDER",
- ).first()
- if ext and ext.ext_data:
- return ext.ext_data.get("order_id", "")
- return ""
- except Exception as e:
- logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}")
- return ""
- finally:
- db.close()
-
-
# ==================== Agent测试接口(开发用) ====================
@router.post("/agent/test")
diff --git a/app/routers/wechat_notify_api.py b/app/routers/wechat_notify_api.py
index 3813308..e4a546e 100644
--- a/app/routers/wechat_notify_api.py
+++ b/app/routers/wechat_notify_api.py
@@ -18,7 +18,7 @@ router = APIRouter(prefix="/api/wechat/notify", tags=["企微通知-IoT回调"])
class SendCardRequest(BaseModel):
"""IoT 派单后发送企微卡片"""
- alarmId: str
+ alarmId: Optional[str] = ""
orderId: str
userIds: List[str]
title: str
@@ -31,7 +31,7 @@ class SendCardRequest(BaseModel):
class SyncStatusRequest(BaseModel):
"""IoT 工单状态变更后同步"""
- alarmId: str
+ alarmId: Optional[str] = ""
orderId: str
status: str # dispatched / confirmed / completed / false_alarm / auto_resolved
operator: Optional[str] = ""
@@ -107,28 +107,29 @@ async def send_card(request: Request):
camera_name = req.cameraName or ""
alarm_type_code = ""
alarm_snapshot_key = ""
- from app.models import get_session, AlarmEvent
- from app.services.camera_name_service import get_camera_name_service
- db = get_session()
- try:
- alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == req.alarmId).first()
- if alarm:
- alarm_type_code = alarm.alarm_type or ""
- alarm_snapshot_key = alarm.snapshot_url or ""
- if not camera_name or camera_name == "未知":
- if alarm.device_id:
- camera_service = get_camera_name_service()
- camera_info = await camera_service.get_camera_info(alarm.device_id)
- camera_name = camera_service.format_display_name(alarm.device_id, camera_info)
- finally:
- db.close()
+ if req.alarmId:
+ from app.models import get_session, AlarmEvent
+ from app.services.camera_name_service import get_camera_name_service
+ db = get_session()
+ try:
+ alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == req.alarmId).first()
+ if alarm:
+ alarm_type_code = alarm.alarm_type or ""
+ alarm_snapshot_key = alarm.snapshot_url or ""
+ if not camera_name or camera_name == "未知":
+ if alarm.device_id:
+ camera_service = get_camera_name_service()
+ camera_info = await camera_service.get_camera_info(alarm.device_id)
+ camera_name = camera_service.format_display_name(alarm.device_id, camera_info)
+ finally:
+ db.close()
# 截图预签名:优先用告警表的 object key(能正确签名)
if not presigned_url or not snapshot_url:
if alarm_snapshot_key:
presigned_url = _get_presigned_url(alarm_snapshot_key)
- logger.info(f"群聊截图诊断: alarm={req.alarmId}, "
+ logger.info(f"群聊截图诊断: order={req.orderId}, alarm={req.alarmId}, "
f"iot_snapshot={req.snapshotUrl!r}, "
f"db_snapshot={alarm_snapshot_key!r}, "
f"presigned_url={presigned_url[:80] if presigned_url else '(空)'}...")
@@ -138,7 +139,7 @@ async def send_card(request: Request):
await wechat.send_group_alarm_combo(
chat_id=group_chat_id,
- alarm_id=req.alarmId,
+ alarm_id=req.alarmId or req.orderId,
alarm_type=actual_alarm_type,
area_name=req.areaName or "",
camera_name=camera_name,
@@ -149,9 +150,8 @@ async def send_card(request: Request):
mention_user_ids=req.userIds,
)
- # 私发卡片
- # 从告警表获取 alarm_type_code(避免重复"告警")
- if not alarm_type_code:
+ # 私发卡片(以 order_id 为主键)
+ if not alarm_type_code and req.alarmId:
from app.models import get_session, AlarmEvent
db = get_session()
try:
@@ -162,17 +162,18 @@ async def send_card(request: Request):
sent = await wechat.send_alarm_card(
user_ids=req.userIds,
- alarm_id=req.alarmId,
+ order_id=req.orderId,
alarm_type=alarm_type_code or req.title,
area_name=req.areaName or "",
camera_name=camera_name if 'camera_name' in dir() else (req.cameraName or ""),
description=f"工单编号:{req.orderId}",
event_time=req.eventTime or "",
alarm_level=req.level or 2,
+ alarm_id=req.alarmId or "",
)
if sent:
- logger.info(f"IoT回调发卡片成功: alarm={req.alarmId}, order={req.orderId}, users={req.userIds}")
+ logger.info(f"IoT回调发卡片成功: order={req.orderId}, alarm={req.alarmId}, users={req.userIds}")
return {"code": 0, "msg": "success"}
else:
return {"code": -1, "msg": "发送失败"}
@@ -197,49 +198,56 @@ async def sync_status(request: Request):
service = get_alarm_event_service()
wechat = get_wechat_service()
+ # 用 order_id 查 response_code 和 alarm_id
+ order_id = req.orderId
+ alarm_id = req.alarmId or wechat.get_alarm_id_for_order(order_id)
+
if req.status == "dispatched":
# 派单:不更新告警,不发企微(企微由 send-card 接口发)
- logger.info(f"dispatched 已记录: alarm={req.alarmId}, order={req.orderId}")
+ logger.info(f"dispatched 已记录: order={order_id}, alarm={alarm_id}")
return {"code": 0, "msg": "success"}
elif req.status == "confirmed":
# 确认接单:更新卡片到第二步(不更新告警)
if wechat.enabled:
- response_code = wechat.get_response_code(req.alarmId)
+ response_code = wechat.get_response_code(order_id)
if response_code:
await wechat.update_alarm_card_step2(
response_code=response_code,
user_ids=[req.operator] if req.operator else [],
- alarm_id=req.alarmId,
+ order_id=order_id,
operator_name=req.operator,
)
- logger.info(f"卡片已更新到步骤2: alarm={req.alarmId}")
+ logger.info(f"卡片已更新到步骤2: order={order_id}")
return {"code": 0, "msg": "success"}
elif req.status in ("completed", "false_alarm", "auto_resolved"):
- # 终态:更新告警状态 + 更新卡片
+ # 终态:更新告警状态(如果有关联告警)+ 更新卡片
terminal_map = {
"completed": {"alarm_status": "CLOSED", "handle_status": "DONE", "card_action": "complete"},
"false_alarm": {"alarm_status": "FALSE", "handle_status": "IGNORED", "card_action": "false"},
"auto_resolved": {"alarm_status": "CLOSED", "handle_status": "DONE", "card_action": "auto_resolve"},
}
mapping = terminal_map[req.status]
- service.handle_alarm(
- alarm_id=req.alarmId,
- alarm_status=mapping["alarm_status"],
- handle_status=mapping["handle_status"],
- handler=req.operator or "",
- remark=req.remark or f"IoT工单: {req.status}",
- )
- logger.info(f"告警终态已同步: alarm={req.alarmId}, status={req.status}")
+
+ # 仅在有关联告警时更新告警状态
+ if alarm_id:
+ service.handle_alarm(
+ alarm_id=alarm_id,
+ alarm_status=mapping["alarm_status"],
+ handle_status=mapping["handle_status"],
+ handler=req.operator or "",
+ remark=req.remark or f"IoT工单: {req.status}",
+ )
+ logger.info(f"告警终态已同步: alarm={alarm_id}, status={req.status}")
if wechat.enabled:
- response_code = wechat.get_response_code(req.alarmId)
+ response_code = wechat.get_response_code(order_id)
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code,
user_ids=[req.operator] if req.operator else [],
- alarm_id=req.alarmId,
+ order_id=order_id,
action=mapping["card_action"],
operator_name=req.operator,
)
diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py
index 4fde2e0..0351ae3 100644
--- a/app/routers/yudao_aiot_alarm.py
+++ b/app/routers/yudao_aiot_alarm.py
@@ -257,9 +257,13 @@ async def _sync_work_order_and_card(alarm_id: str, alarm_status: str, operator:
"""前端操作后:优先调 IoT 工单 API,降级直接更新卡片"""
try:
from app.services.work_order_client import get_work_order_client
+ from app.services.wechat_service import get_wechat_service
+ wechat = get_wechat_service()
wo_client = get_work_order_client()
- order_id = _get_order_id_for_alarm(alarm_id) if wo_client.enabled else ""
+
+ # 通过 wechat_card_state 查 order_id
+ order_id = wechat.get_order_id_for_alarm(alarm_id) if wo_client.enabled else ""
if order_id and wo_client.enabled:
if alarm_status == "CLOSED":
@@ -267,45 +271,23 @@ async def _sync_work_order_and_card(alarm_id: str, alarm_status: str, operator:
else: # FALSE
success = await wo_client.false_alarm(order_id)
if success:
- logger.info(f"前端操作已同步IoT工单: alarm={alarm_id}, status={alarm_status}")
+ logger.info(f"前端操作已同步IoT工单: alarm={alarm_id}, order={order_id}, status={alarm_status}")
return # IoT 回调 sync-status 会更新卡片
- # 降级:直接更新卡片
- from app.services.wechat_service import get_wechat_service
- wechat = get_wechat_service()
- if wechat.enabled:
- response_code = wechat.get_response_code(alarm_id)
+ # 降级:直接更新卡片(用 order_id 查 response_code)
+ if wechat.enabled and order_id:
+ response_code = wechat.get_response_code(order_id)
if response_code:
action = "complete" if alarm_status == "CLOSED" else "false"
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[],
- alarm_id=alarm_id, action=action, operator_name=operator,
+ order_id=order_id, action=action, operator_name=operator,
)
- logger.info(f"降级直接更新卡片: alarm={alarm_id}, action={action}")
+ logger.info(f"降级直接更新卡片: order={order_id}, action={action}")
except Exception as e:
logger.error(f"同步工单/卡片失败: alarm={alarm_id}, error={e}", exc_info=True)
-def _get_order_id_for_alarm(alarm_id: str) -> str:
- """从 alarm_event_ext 中获取关联的工单ID"""
- from app.models import get_session, AlarmEventExt
-
- db = get_session()
- try:
- ext = db.query(AlarmEventExt).filter(
- AlarmEventExt.alarm_id == alarm_id,
- AlarmEventExt.ext_type == "WORK_ORDER",
- ).first()
- if ext and ext.ext_data:
- return ext.ext_data.get("order_id", "")
- return ""
- except Exception as e:
- logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}")
- return ""
- finally:
- db.close()
-
-
@router.delete("/alert/delete")
async def delete_alert(
alarmId: Optional[str] = Query(None, description="告警ID"),
diff --git a/app/services/agent/prompts.py b/app/services/agent/prompts.py
index 8c4a5da..ecd3bd0 100644
--- a/app/services/agent/prompts.py
+++ b/app/services/agent/prompts.py
@@ -2,17 +2,22 @@
Agent Prompt 定义
"""
-SYSTEM_PROMPT = """你是VSP安防AI助手,通过企业微信协助安保人员处理告警和工单。
+SYSTEM_PROMPT = """你是VSP物业AI助手,通过企业微信协助物业人员处理安保和保洁工单。
## 能力(必须通过工具获取数据)
-1. 查询告警统计和明细(query_alarm_stats / list_alarms / get_alarm_detail)
-2. 处理告警(update_alarm_status:确认接单、忽略、处理完成、误报)
-3. 提交工单处理结果(submit_order_result:含文字描述和处理后照片)
-4. 查询待处理工单(list_my_orders)
-5. 查询摄像头信息(query_camera)
+1. 查询工单统计(query_order_stats)— 支持安保和保洁工单
+2. 查询工单列表(list_orders)— 按类型、状态、时间筛选
+3. 查看工单详情(get_order_detail)— 含告警信息、保洁信息
+4. 操作工单状态(update_order_status)— 确认接单、完成、误报、忽略
+5. 提交处理结果(submit_order_result)— 含文字描述和处理后照片
+6. 查询摄像头信息(query_camera)
+
+## 工单类型
+- SECURITY:安保工单(关联告警,含摄像头、告警类型等)
+- CLEAN:保洁工单(含保洁类型、难度、预计时长等)
## 核心原则(严格遵守)
-- 所有数据必须来自工具调用结果,绝对不要编造告警ID、告警数量、摄像头名称、时间等任何数据
+- 所有数据必须来自工具调用结果,绝对不要编造工单ID、数量、人员等任何数据
- 如果工具返回错误或未找到数据,如实告知用户,不要猜测或补充
- 不知道的事情直接说"我无法确认",不要推测
- 不要编造不存在的功能或操作
@@ -23,8 +28,7 @@ SYSTEM_PROMPT = """你是VSP安防AI助手,通过企业微信协助安保人
- 回复简洁,适合手机阅读
- 重要信息用【】标注
- 禁止使用markdown语法(如、**加粗**、# 标题),企微聊天不支持
-- 告警截图会自动发送图片消息,文字回复中不要包含图片链接
-- 用户问非安防相关问题时,简短回答"我只能协助处理安防告警和工单相关事务"
+- 用户问非物业相关问题时,简短回答"我只能协助处理物业工单相关事务"
"""
IMAGE_ANALYZE_PROMPT = """你是物业安防图片分析员。分析这张图片,判断是否存在安全隐患或需要上报的情况。
diff --git a/app/services/agent/tools/__init__.py b/app/services/agent/tools/__init__.py
index 292740a..6d904d2 100644
--- a/app/services/agent/tools/__init__.py
+++ b/app/services/agent/tools/__init__.py
@@ -2,18 +2,16 @@
工具注册表:导出 all_tools 供图构建使用
"""
-from .alarm_query import query_alarm_stats, list_alarms, get_alarm_detail
-from .alarm_action import update_alarm_status
-from .order_tools import list_my_orders, submit_order_result
+from .order_query import query_order_stats, list_orders, get_order_detail
+from .order_action import update_order_status, submit_order_result
from .camera_tools import query_camera
# 所有工具列表 — 添加新工具只需在这里追加
all_tools = [
- query_alarm_stats,
- list_alarms,
- get_alarm_detail,
- update_alarm_status,
- list_my_orders,
+ query_order_stats,
+ list_orders,
+ get_order_detail,
+ update_order_status,
submit_order_result,
query_camera,
]
diff --git a/app/services/agent/tools/camera_tools.py b/app/services/agent/tools/camera_tools.py
index 5865566..7fd38a0 100644
--- a/app/services/agent/tools/camera_tools.py
+++ b/app/services/agent/tools/camera_tools.py
@@ -5,7 +5,7 @@
import json
from langchain_core.tools import tool
-from .alarm_query import _get_camera_display_name
+from .order_query import _get_camera_display_name
@tool
diff --git a/app/services/agent/tools/order_action.py b/app/services/agent/tools/order_action.py
new file mode 100644
index 0000000..8b25c86
--- /dev/null
+++ b/app/services/agent/tools/order_action.py
@@ -0,0 +1,182 @@
+"""
+工单操作工具:确认接单、完成、误报、提交处理结果
+"""
+
+import json
+from typing import Optional, List
+from langchain_core.tools import tool
+from langchain_core.runnables import RunnableConfig
+
+from app.utils.logger import logger
+
+
+async def _update_wechat_card(order_id: str, user_id: str, action: str):
+ """更新企微卡片状态(以 order_id 为 key)"""
+ try:
+ from app.services.wechat_service import get_wechat_service
+ wechat = get_wechat_service()
+ response_code = wechat.get_response_code(order_id)
+ if not response_code:
+ return
+
+ if action == "confirm":
+ await wechat.update_alarm_card_step2(
+ response_code=response_code, user_ids=[user_id],
+ order_id=order_id, operator_name=user_id,
+ )
+ else:
+ await wechat.update_alarm_card_terminal(
+ response_code=response_code, user_ids=[user_id],
+ order_id=order_id, action=action, operator_name=user_id,
+ )
+ except Exception as e:
+ logger.error(f"更新企微卡片失败: order={order_id}, error={e}")
+
+
+@tool
+async def update_order_status(order_id: str, action: str, config: RunnableConfig) -> str:
+ """更新工单状态:确认接单(confirm)、处理完成(complete)、标记误报(false)、忽略(ignore)
+
+ Args:
+ order_id: 工单ID(ops_order.id)
+ action: 操作类型 confirm=确认接单 complete=处理完成 false=标记误报 ignore=忽略
+ """
+ user_id = config.get("configurable", {}).get("user_id", "")
+
+ from app.services.work_order_client import get_work_order_client
+ wo_client = get_work_order_client()
+
+ if not wo_client.enabled:
+ return json.dumps({"error": "工单系统未启用"}, ensure_ascii=False)
+
+ # IoT 工单操作
+ iot_success = False
+ if action == "confirm":
+ iot_success = await wo_client.confirm_order(order_id)
+ elif action in ("ignore", "false"):
+ iot_success = await wo_client.false_alarm(order_id)
+ elif action == "complete":
+ iot_success = await wo_client.submit_order(order_id, result=f"已处理 by {user_id}")
+
+ if not iot_success:
+ return json.dumps({"error": f"工单操作失败: order={order_id}, action={action}"}, ensure_ascii=False)
+
+ # 有关联告警时同步告警状态
+ from app.services.wechat_service import get_wechat_service
+ wechat = get_wechat_service()
+ alarm_id = wechat.get_alarm_id_for_order(order_id)
+
+ if alarm_id:
+ action_map = {
+ "confirm": ("CONFIRMED", "HANDLING", "Agent确认接单"),
+ "ignore": ("FALSE", "IGNORED", "Agent忽略"),
+ "complete": ("CLOSED", "DONE", "Agent已处理"),
+ "false": ("FALSE", "IGNORED", "Agent标记误报"),
+ }
+ if action in action_map:
+ alarm_status, handle_status, remark = action_map[action]
+ from app.services.alarm_event_service import get_alarm_event_service
+ svc = get_alarm_event_service()
+ svc.handle_alarm(
+ alarm_id=alarm_id, alarm_status=alarm_status,
+ handle_status=handle_status, handler=user_id, remark=remark,
+ )
+
+ # 更新企微卡片
+ await _update_wechat_card(order_id, user_id, action)
+
+ action_labels = {
+ "confirm": "已确认接单",
+ "ignore": "已忽略",
+ "complete": "已处理完成",
+ "false": "已标记误报",
+ }
+ return json.dumps(
+ {"success": True, "message": f"{action_labels.get(action, action)}: 工单{order_id}"},
+ ensure_ascii=False,
+ )
+
+
+@tool
+async def submit_order_result(
+ order_id: str,
+ result_text: str,
+ config: RunnableConfig,
+ image_urls: Optional[List[str]] = None,
+) -> str:
+ """提交工单处理结果(文字描述+处理后照片URL)
+
+ Args:
+ order_id: 工单ID(ops_order.id)
+ result_text: 处理结果描述
+ image_urls: 处理后照片URL列表(COS永久URL)
+ """
+ user_id = config.get("configurable", {}).get("user_id", "")
+
+ if image_urls is None:
+ image_urls = []
+
+ # 合并 session 中暂存的图片
+ from app.services.session_manager import get_session_manager
+ session = get_session_manager().get(user_id)
+ if session.pending_images:
+ image_urls = session.pending_images + image_urls
+ session.pending_images = []
+
+ from app.services.work_order_client import get_work_order_client
+ wo_client = get_work_order_client()
+
+ if wo_client.enabled:
+ success = await wo_client.submit_order(
+ order_id,
+ result=f"{result_text} by {user_id}",
+ result_img_urls=image_urls or None,
+ )
+ if not success:
+ return json.dumps({"error": f"提交工单结果失败: order={order_id}"}, ensure_ascii=False)
+
+ # 有关联告警时同步告警状态
+ from app.services.wechat_service import get_wechat_service
+ wechat = get_wechat_service()
+ alarm_id = wechat.get_alarm_id_for_order(order_id)
+
+ if alarm_id:
+ from app.services.alarm_event_service import get_alarm_event_service
+ svc = get_alarm_event_service()
+ remark = f"Agent结单: {result_text}"
+ if image_urls:
+ remark += f" (附{len(image_urls)}张图片)"
+ svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
+ handle_status="DONE", handler=user_id, remark=remark)
+
+ # 持久化处理结果图片到 alarm_event_ext
+ if image_urls:
+ try:
+ from app.models import get_session as get_db_session, AlarmEventExt
+ db = get_db_session()
+ try:
+ ext = AlarmEventExt(
+ alarm_id=alarm_id,
+ ext_type="HANDLER_RESULT",
+ ext_data={"result_text": result_text, "image_urls": image_urls, "handler": user_id},
+ )
+ db.add(ext)
+ db.commit()
+ except Exception as e:
+ db.rollback()
+ logger.error(f"持久化处理结果图片失败: {e}")
+ finally:
+ db.close()
+ except Exception as e:
+ logger.error(f"保存处理结果失败: {e}")
+
+ # 更新卡片到终态
+ await _update_wechat_card(order_id, user_id, "complete")
+
+ result = {
+ "success": True,
+ "message": f"工单已提交: {order_id}",
+ "result": result_text,
+ "images_count": len(image_urls),
+ }
+ return json.dumps(result, ensure_ascii=False)
diff --git a/app/services/agent/tools/order_query.py b/app/services/agent/tools/order_query.py
new file mode 100644
index 0000000..09e14d6
--- /dev/null
+++ b/app/services/agent/tools/order_query.py
@@ -0,0 +1,334 @@
+"""
+工单查询工具:统计、列表、详情(查 IoT ops_order + 扩展表)
+支持安保工单(SECURITY)和保洁工单(CLEAN)
+"""
+
+import json
+from datetime import timedelta
+from typing import Optional
+from langchain_core.tools import tool
+from langchain_core.runnables import RunnableConfig
+
+from app.utils.logger import logger
+from app.utils.timezone import beijing_now
+
+
+# 告警类型中文映射
+ALARM_TYPE_NAMES = {
+ "leave_post": "人员离岗", "intrusion": "周界入侵",
+ "illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵",
+}
+
+# 工单状态映射
+ORDER_STATUS_NAMES = {
+ "PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗",
+ "PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消",
+}
+
+# 工单优先级映射
+PRIORITY_NAMES = {0: "低", 1: "中", 2: "高"}
+
+# 保洁类型映射
+CLEANING_TYPE_NAMES = {
+ "ROUTINE": "日常保洁", "DEEP": "深度保洁",
+ "SPOT": "点状保洁", "EMERGENCY": "应急保洁",
+}
+
+
+def _parse_time_range(time_range: str):
+ """解析时间范围,返回 (start_time, label)"""
+ now = beijing_now()
+ if time_range == "week":
+ start = now - timedelta(days=now.weekday())
+ start = start.replace(hour=0, minute=0, second=0, microsecond=0)
+ return start, "本周"
+ elif time_range == "month":
+ start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
+ return start, "本月"
+ elif time_range == "yesterday":
+ start = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
+ end = now.replace(hour=0, minute=0, second=0, microsecond=0)
+ return start, "昨日"
+ else:
+ start = now.replace(hour=0, minute=0, second=0, microsecond=0)
+ return start, "今日"
+
+
+def _get_camera_display_name(device_id: str) -> str:
+ """同步获取摄像头显示名称"""
+ try:
+ import asyncio
+ from app.services.camera_name_service import get_camera_name_service
+ camera_service = get_camera_name_service()
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
+ import concurrent.futures
+ with concurrent.futures.ThreadPoolExecutor() as pool:
+ cam_info = pool.submit(
+ asyncio.run, camera_service.get_camera_info(device_id)
+ ).result(timeout=5)
+ else:
+ cam_info = asyncio.run(camera_service.get_camera_info(device_id))
+ return camera_service.format_display_name(device_id, cam_info)
+ except Exception:
+ return device_id
+
+
+def _query_orders(
+ order_type: Optional[str] = None,
+ status: Optional[str] = None,
+ start_time=None,
+ end_time=None,
+ limit: int = 100,
+ assignee_name: Optional[str] = None,
+):
+ """查询 IoT 工单(跨库只读)"""
+ from app.models_iot import get_iot_session, IotOpsOrder
+ db = get_iot_session()
+ try:
+ q = db.query(IotOpsOrder).filter(IotOpsOrder.deleted == 0)
+ if order_type and order_type != "ALL":
+ q = q.filter(IotOpsOrder.order_type == order_type)
+ if status:
+ q = q.filter(IotOpsOrder.status == status)
+ if start_time:
+ q = q.filter(IotOpsOrder.create_time >= start_time)
+ if end_time:
+ q = q.filter(IotOpsOrder.create_time < end_time)
+ if assignee_name:
+ q = q.filter(IotOpsOrder.assignee_name.contains(assignee_name))
+
+ total = q.count()
+ orders = q.order_by(IotOpsOrder.create_time.desc()).limit(limit).all()
+
+ # 提取所有 order_id 用于关联查询扩展表
+ order_ids = [o.id for o in orders]
+
+ # 批量查安保扩展
+ sec_ext_map = {}
+ if order_ids:
+ from app.models_iot import IotOpsOrderSecurityExt
+ sec_exts = db.query(IotOpsOrderSecurityExt).filter(
+ IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
+ IotOpsOrderSecurityExt.deleted == 0,
+ ).all()
+ sec_ext_map = {e.ops_order_id: e for e in sec_exts}
+
+ # 批量查保洁扩展
+ clean_ext_map = {}
+ if order_ids:
+ from app.models_iot import IotOpsOrderCleanExt
+ clean_exts = db.query(IotOpsOrderCleanExt).filter(
+ IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
+ IotOpsOrderCleanExt.deleted == 0,
+ ).all()
+ clean_ext_map = {e.ops_order_id: e for e in clean_exts}
+
+ return orders, total, sec_ext_map, clean_ext_map
+ except Exception as e:
+ logger.error(f"查询IoT工单失败: {e}", exc_info=True)
+ return [], 0, {}, {}
+ finally:
+ db.close()
+
+
+@tool
+def query_order_stats(time_range: str = "today", order_type: str = "ALL") -> str:
+ """查询工单统计数据(总数、按状态分布、按类型分布)
+
+ Args:
+ time_range: 时间范围 today=今日 week=本周 month=本月 yesterday=昨日
+ order_type: 工单类型筛选 SECURITY=安保 CLEAN=保洁 ALL=全部
+ """
+ start, range_label = _parse_time_range(time_range)
+ now = beijing_now()
+ end = now
+ if time_range == "yesterday":
+ end = now.replace(hour=0, minute=0, second=0, microsecond=0)
+
+ orders, total, sec_ext_map, clean_ext_map = _query_orders(
+ order_type=order_type if order_type != "ALL" else None,
+ start_time=start, end_time=end, limit=10000,
+ )
+
+ # 按状态统计
+ status_count = {}
+ for o in orders:
+ s = o.status or "PENDING"
+ status_count[s] = status_count.get(s, 0) + 1
+
+ # 按类型统计
+ type_count = {"SECURITY": 0, "CLEAN": 0}
+ alarm_type_count = {}
+ for o in orders:
+ ot = o.order_type or "SECURITY"
+ type_count[ot] = type_count.get(ot, 0) + 1
+ # 安保工单细分告警类型
+ sec_ext = sec_ext_map.get(o.id)
+ if sec_ext and sec_ext.alarm_type:
+ alarm_type_count[sec_ext.alarm_type] = alarm_type_count.get(sec_ext.alarm_type, 0) + 1
+
+ # 误报统计(安保特有)
+ false_alarm_count = sum(
+ 1 for e in sec_ext_map.values() if e.false_alarm == 1
+ )
+
+ result = {
+ "range": range_label,
+ "total": total,
+ "by_status": {ORDER_STATUS_NAMES.get(s, s): c for s, c in status_count.items()},
+ "by_order_type": {k: v for k, v in type_count.items() if v > 0},
+ "by_alarm_type": {ALARM_TYPE_NAMES.get(t, t): c for t, c in alarm_type_count.items()} if alarm_type_count else {},
+ "false_alarm_count": false_alarm_count,
+ }
+ return json.dumps(result, ensure_ascii=False)
+
+
+@tool
+def list_orders(
+ time_range: str = "today",
+ order_type: str = "ALL",
+ status: str = "",
+ limit: int = 10,
+ assigned_to_me: bool = False,
+ config: RunnableConfig = None,
+) -> str:
+ """查询工单列表,返回最近的工单记录
+
+ Args:
+ time_range: 时间范围 today/week/month/yesterday
+ order_type: 工单类型 SECURITY=安保 CLEAN=保洁 ALL=全部
+ status: 状态筛选 PENDING/ASSIGNED/ARRIVED/PAUSED/COMPLETED/CANCELLED
+ limit: 返回条数,默认10,最多20
+ assigned_to_me: 是否只看我的工单
+ """
+ start, range_label = _parse_time_range(time_range)
+ now = beijing_now()
+ end = now
+ if time_range == "yesterday":
+ end = now.replace(hour=0, minute=0, second=0, microsecond=0)
+
+ limit = min(limit, 20)
+
+ # 获取当前用户
+ user_id = ""
+ if config and assigned_to_me:
+ user_id = config.get("configurable", {}).get("user_id", "")
+
+ orders, total, sec_ext_map, clean_ext_map = _query_orders(
+ order_type=order_type if order_type != "ALL" else None,
+ status=status or None,
+ start_time=start, end_time=end, limit=limit,
+ )
+
+ items = []
+ for o in orders:
+ create_time = ""
+ if o.create_time:
+ try:
+ create_time = o.create_time.strftime("%m-%d %H:%M")
+ except Exception:
+ create_time = str(o.create_time)[:16]
+
+ item = {
+ "order_id": str(o.id),
+ "order_code": o.order_code or "",
+ "type": o.order_type or "SECURITY",
+ "title": o.title or "",
+ "status": ORDER_STATUS_NAMES.get(o.status, o.status or "待处理"),
+ "priority": PRIORITY_NAMES.get(o.priority, "中"),
+ "assignee": o.assignee_name or "",
+ "time": create_time,
+ }
+
+ # 安保扩展
+ sec_ext = sec_ext_map.get(o.id)
+ if sec_ext:
+ item["alarm_type"] = ALARM_TYPE_NAMES.get(sec_ext.alarm_type, sec_ext.alarm_type or "")
+ item["camera"] = sec_ext.camera_name or _get_camera_display_name(sec_ext.camera_id) if sec_ext.camera_id else ""
+ item["false_alarm"] = bool(sec_ext.false_alarm)
+
+ # 保洁扩展
+ clean_ext = clean_ext_map.get(o.id)
+ if clean_ext:
+ item["cleaning_type"] = CLEANING_TYPE_NAMES.get(clean_ext.cleaning_type, clean_ext.cleaning_type or "")
+ item["difficulty"] = clean_ext.difficulty_level
+
+ items.append(item)
+
+ result = {"range": range_label, "total": total, "items": items}
+ return json.dumps(result, ensure_ascii=False)
+
+
+@tool
+def get_order_detail(order_id: str, config: RunnableConfig) -> str:
+ """查询单条工单的详细信息
+
+ Args:
+ order_id: 工单ID(ops_order.id)
+ """
+ from app.models_iot import get_iot_session, IotOpsOrder, IotOpsOrderSecurityExt, IotOpsOrderCleanExt
+ db = get_iot_session()
+ try:
+ order = db.query(IotOpsOrder).filter(
+ IotOpsOrder.id == int(order_id),
+ IotOpsOrder.deleted == 0,
+ ).first()
+ if not order:
+ return json.dumps({"error": f"未找到工单: {order_id}"}, ensure_ascii=False)
+
+ result = {
+ "order_id": str(order.id),
+ "order_code": order.order_code or "",
+ "order_type": order.order_type or "",
+ "title": order.title or "",
+ "description": order.description or "",
+ "status": ORDER_STATUS_NAMES.get(order.status, order.status or ""),
+ "priority": PRIORITY_NAMES.get(order.priority, "中"),
+ "assignee": order.assignee_name or "",
+ "location": order.location or "",
+ "create_time": order.create_time.strftime("%Y-%m-%d %H:%M:%S") if order.create_time else "",
+ }
+
+ # 安保扩展
+ sec_ext = db.query(IotOpsOrderSecurityExt).filter(
+ IotOpsOrderSecurityExt.ops_order_id == order.id,
+ IotOpsOrderSecurityExt.deleted == 0,
+ ).first()
+ if sec_ext:
+ result["alarm_id"] = sec_ext.alarm_id or ""
+ result["alarm_type"] = ALARM_TYPE_NAMES.get(sec_ext.alarm_type, sec_ext.alarm_type or "")
+ result["camera_name"] = sec_ext.camera_name or (
+ _get_camera_display_name(sec_ext.camera_id) if sec_ext.camera_id else ""
+ )
+ result["has_image"] = bool(sec_ext.image_url)
+ result["false_alarm"] = bool(sec_ext.false_alarm)
+ result["result"] = sec_ext.result or ""
+ if sec_ext.dispatched_time:
+ result["dispatched_time"] = sec_ext.dispatched_time.strftime("%Y-%m-%d %H:%M:%S")
+ if sec_ext.confirmed_time:
+ result["confirmed_time"] = sec_ext.confirmed_time.strftime("%Y-%m-%d %H:%M:%S")
+ if sec_ext.completed_time:
+ result["completed_time"] = sec_ext.completed_time.strftime("%Y-%m-%d %H:%M:%S")
+
+ # 保洁扩展
+ clean_ext = db.query(IotOpsOrderCleanExt).filter(
+ IotOpsOrderCleanExt.ops_order_id == order.id,
+ IotOpsOrderCleanExt.deleted == 0,
+ ).first()
+ if clean_ext:
+ result["cleaning_type"] = CLEANING_TYPE_NAMES.get(clean_ext.cleaning_type, clean_ext.cleaning_type or "")
+ result["difficulty"] = clean_ext.difficulty_level
+ result["expected_duration"] = clean_ext.expected_duration
+ result["is_auto"] = bool(clean_ext.is_auto)
+ if clean_ext.arrived_time:
+ result["arrived_time"] = clean_ext.arrived_time.strftime("%Y-%m-%d %H:%M:%S")
+ if clean_ext.completed_time:
+ result["clean_completed_time"] = clean_ext.completed_time.strftime("%Y-%m-%d %H:%M:%S")
+
+ return json.dumps(result, ensure_ascii=False)
+ except Exception as e:
+ logger.error(f"查询工单详情失败: {e}", exc_info=True)
+ return json.dumps({"error": f"查询失败: {e}"}, ensure_ascii=False)
+ finally:
+ db.close()
diff --git a/app/services/daily_report_service.py b/app/services/daily_report_service.py
index 95e3c9e..1bd4bc8 100644
--- a/app/services/daily_report_service.py
+++ b/app/services/daily_report_service.py
@@ -1,8 +1,8 @@
"""
-每日告警日报定时推送服务
+每日工单日报定时推送服务
-每天定时生成前一天的告警汇总,发送到企微群聊。
-支持按边缘节点分组统计。
+每天定时生成前一天的工单汇总,发送到企微群聊。
+数据源:IoT ops_order + 安保/保洁扩展表。
"""
import asyncio
from collections import Counter, defaultdict
@@ -12,7 +12,7 @@ from typing import Dict, List, Optional
from app.utils.logger import logger
from app.config import settings
-# 告警类型中文映射(与 wechat_service 保持一致)
+# 告警类型中文映射
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
@@ -20,23 +20,21 @@ ALARM_TYPE_NAMES = {
"vehicle_congestion": "车辆拥堵",
}
+# 保洁类型映射
+CLEANING_TYPE_NAMES = {
+ "ROUTINE": "日常保洁", "DEEP": "深度保洁",
+ "SPOT": "点状保洁", "EMERGENCY": "应急保洁",
+}
+
+# 工单状态映射
+ORDER_STATUS_NAMES = {
+ "PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗",
+ "PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消",
+}
+
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
-def _get_edge_name_map() -> Dict[str, str]:
- """从 EdgeDevice 表获取 edge_node_id → device_name 映射"""
- from app.models import get_session, EdgeDevice
- db = get_session()
- try:
- devices = db.query(EdgeDevice.device_id, EdgeDevice.device_name).all()
- return {d.device_id: (d.device_name or d.device_id) for d in devices}
- except Exception as e:
- logger.warning(f"查询边缘设备名称失败: {e}")
- return {}
- finally:
- db.close()
-
-
def _format_resp_time(minutes: float) -> str:
"""格式化响应时长"""
if minutes < 60:
@@ -45,74 +43,113 @@ def _format_resp_time(minutes: float) -> str:
async def generate_daily_report() -> Optional[str]:
- """生成昨日告警日报 Markdown 内容(含按边缘节点分组)"""
- from app.services.alarm_event_service import get_alarm_event_service
- from app.services.camera_name_service import get_camera_name_service
+ """生成昨日工单日报 Markdown 内容"""
from app.utils.timezone import beijing_now
- svc = get_alarm_event_service()
- camera_svc = get_camera_name_service()
-
now = beijing_now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
day_before_start = today_start - timedelta(days=2)
- # 查询昨日和前日全量告警
- yesterday_alarms, yesterday_total = svc.get_alarms(
- start_time=yesterday_start, end_time=today_start, page=1, page_size=10000
- )
- _, prev_total = svc.get_alarms(
- start_time=day_before_start, end_time=yesterday_start, page=1, page_size=1
- )
-
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
- # 无告警时发送简短通知
- if yesterday_total == 0:
- return (
- f"**AI安防日报 — {date_str}({weekday})**\n\n"
- f">昨日告警总计:**0** 条\n"
- f">系统运行正常,无异常事件"
+ # 查询 IoT 工单
+ try:
+ from app.models_iot import (
+ get_iot_session, IotOpsOrder,
+ IotOpsOrderSecurityExt, IotOpsOrderCleanExt,
)
+ except Exception as e:
+ logger.error(f"IoT数据库不可用,日报生成失败: {e}")
+ return None
- # ---- 全局统计 ----
- type_counter: Counter = Counter()
- device_counter: Counter = Counter()
- handle_done = 0
- handle_ignored = 0
- handle_unhandled = 0
- false_alarm = 0
+ db = get_iot_session()
+ try:
+ # 昨日工单
+ yesterday_orders = db.query(IotOpsOrder).filter(
+ IotOpsOrder.create_time >= yesterday_start,
+ IotOpsOrder.create_time < today_start,
+ IotOpsOrder.deleted == 0,
+ ).all()
+ yesterday_total = len(yesterday_orders)
+
+ # 前日工单(用于环比)
+ prev_total = db.query(IotOpsOrder).filter(
+ IotOpsOrder.create_time >= day_before_start,
+ IotOpsOrder.create_time < yesterday_start,
+ IotOpsOrder.deleted == 0,
+ ).count()
+
+ if yesterday_total == 0:
+ return (
+ f"**物业工单日报 — {date_str}({weekday})**\n\n"
+ f">昨日工单总计:**0** 条\n"
+ f">系统运行正常,无工单"
+ )
+
+ # 收集 order_ids
+ order_ids = [o.id for o in yesterday_orders]
+
+ # 批量查安保扩展
+ sec_ext_map = {}
+ sec_exts = db.query(IotOpsOrderSecurityExt).filter(
+ IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
+ IotOpsOrderSecurityExt.deleted == 0,
+ ).all()
+ sec_ext_map = {e.ops_order_id: e for e in sec_exts}
+
+ # 批量查保洁扩展
+ clean_ext_map = {}
+ clean_exts = db.query(IotOpsOrderCleanExt).filter(
+ IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
+ IotOpsOrderCleanExt.deleted == 0,
+ ).all()
+ clean_ext_map = {e.ops_order_id: e for e in clean_exts}
+
+ except Exception as e:
+ logger.error(f"查询IoT工单失败: {e}", exc_info=True)
+ return None
+ finally:
+ db.close()
+
+ # ---- 统计 ----
+ type_count = {"SECURITY": 0, "CLEAN": 0}
+ status_count = Counter()
+ alarm_type_count = Counter()
+ camera_counter = Counter()
+ false_alarm_count = 0
response_times: List[float] = []
+ cleaning_type_count = Counter()
- # ---- 按边缘节点分组 ----
- edge_groups: Dict[str, List] = defaultdict(list)
+ for o in yesterday_orders:
+ ot = o.order_type or "SECURITY"
+ type_count[ot] = type_count.get(ot, 0) + 1
+ status_count[o.status or "PENDING"] += 1
- for a in yesterday_alarms:
- type_counter[a.alarm_type] += 1
- device_counter[a.device_id] += 1
+ # 安保统计
+ sec_ext = sec_ext_map.get(o.id)
+ if sec_ext:
+ if sec_ext.alarm_type:
+ alarm_type_count[sec_ext.alarm_type] += 1
+ if sec_ext.camera_name:
+ camera_counter[sec_ext.camera_name] += 1
+ elif sec_ext.camera_id:
+ camera_counter[sec_ext.camera_id] += 1
+ if sec_ext.false_alarm == 1:
+ false_alarm_count += 1
+ # 响应时长:dispatched → confirmed
+ if sec_ext.dispatched_time and sec_ext.confirmed_time:
+ delta = (sec_ext.confirmed_time - sec_ext.dispatched_time).total_seconds() / 60.0
+ if 0 <= delta <= 360:
+ response_times.append(delta)
- # 按 edge_node_id 分组(无 edge_node_id 归入 "unknown")
- edge_key = a.edge_node_id or "unknown"
- edge_groups[edge_key].append(a)
+ # 保洁统计
+ clean_ext = clean_ext_map.get(o.id)
+ if clean_ext and clean_ext.cleaning_type:
+ cleaning_type_count[clean_ext.cleaning_type] += 1
- if a.handle_status == "DONE":
- handle_done += 1
- elif a.handle_status == "IGNORED":
- handle_ignored += 1
- else:
- handle_unhandled += 1
-
- if a.alarm_status == "FALSE":
- false_alarm += 1
-
- if a.handled_at and a.event_time:
- delta = (a.handled_at - a.event_time).total_seconds() / 60.0
- if 0 <= delta <= 360: # 排除 >6h 异常值
- response_times.append(delta)
-
- # 环比变化
+ # 环比
if prev_total > 0:
change_pct = (yesterday_total - prev_total) / prev_total * 100
if change_pct > 0:
@@ -122,95 +159,62 @@ async def generate_daily_report() -> Optional[str]:
else:
change_str = f"前日{prev_total}条,持平"
else:
- change_str = "前日无告警"
+ change_str = "前日无工单"
# 平均响应时长
resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据"
- # 设备 Top5 — 批量获取摄像头名称
- top5_devices = device_counter.most_common(5)
- all_device_ids = list({d[0] for d in top5_devices})
- # 也收集各边缘节点的 top 设备 ID
- for edge_key, alarms in edge_groups.items():
- edge_dev_counter = Counter(a.device_id for a in alarms)
- for dev_id, _ in edge_dev_counter.most_common(3):
- if dev_id not in all_device_ids:
- all_device_ids.append(dev_id)
-
- try:
- name_map = await camera_svc.get_display_names_batch(all_device_ids)
- except Exception as e:
- logger.warning(f"日报获取摄像头名称失败: {e}")
- name_map = {}
-
- # 边缘设备名称映射
- edge_name_map = _get_edge_name_map()
+ # 待处理数量
+ pending_count = sum(
+ 1 for o in yesterday_orders if o.status in ("PENDING", "ASSIGNED")
+ )
+ completed_count = status_count.get("COMPLETED", 0)
+ cancelled_count = status_count.get("CANCELLED", 0)
# ==================== 组装 Markdown ====================
lines = [
- f"**AI安防日报 — {date_str}({weekday})**",
+ f"**物业工单日报 — {date_str}({weekday})**",
"",
- f">昨日告警总计:{yesterday_total} 条({change_str})",
- f">待处理:{handle_unhandled} 条",
- f">已处理:{handle_done}条 | 已忽略:{handle_ignored}条 | 误报:{false_alarm}条",
- f">平均响应:{resp_str}",
+ f">昨日工单总计:{yesterday_total} 条({change_str})",
]
- # 按类型分布
- if type_counter:
+ # 按工单类型
+ sec_count = type_count.get("SECURITY", 0)
+ clean_count = type_count.get("CLEAN", 0)
+ if sec_count and clean_count:
+ lines.append(f">安保工单:{sec_count}条 | 保洁工单:{clean_count}条")
+ elif sec_count:
+ lines.append(f">安保工单:{sec_count}条")
+ elif clean_count:
+ lines.append(f">保洁工单:{clean_count}条")
+
+ lines.append(f">待处理:{pending_count} 条 | "
+ f"已完成:{completed_count}条 | 已取消:{cancelled_count}条 | 误报:{false_alarm_count}条")
+ lines.append(f">平均响应:{resp_str}")
+
+ # 安保告警类型分布
+ if alarm_type_count:
lines.append("")
- lines.append("**按类型分布**")
- for alarm_type, count in type_counter.most_common():
+ lines.append("**安保告警类型分布**")
+ for alarm_type, count in alarm_type_count.most_common():
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
lines.append(f">{type_name}:{count}条")
- # 设备 Top5
- if top5_devices:
+ # 保洁类型分布
+ if cleaning_type_count:
lines.append("")
- lines.append("**告警设备 Top5**")
- for i, (device_id, count) in enumerate(top5_devices, 1):
- display_name = name_map.get(device_id, device_id)
- lines.append(f">{i}. {display_name} — {count}条")
+ lines.append("**保洁类型分布**")
+ for ct, count in cleaning_type_count.most_common():
+ ct_name = CLEANING_TYPE_NAMES.get(ct, ct)
+ lines.append(f">{ct_name}:{count}条")
- # ---- 按边缘节点分组汇总 ----
- if len(edge_groups) > 1 or (len(edge_groups) == 1 and "unknown" not in edge_groups):
+ # 摄像头 Top5
+ top5_cameras = camera_counter.most_common(5)
+ if top5_cameras:
lines.append("")
- lines.append("**各边缘节点汇总**")
-
- # 按告警数降序排列
- sorted_edges = sorted(edge_groups.items(), key=lambda x: len(x[1]), reverse=True)
-
- for edge_key, alarms in sorted_edges:
- edge_name = edge_name_map.get(edge_key, edge_key)
- if edge_key == "unknown":
- edge_name = "未知节点"
-
- edge_total = len(alarms)
- edge_type_counter = Counter(a.alarm_type for a in alarms)
- edge_unhandled = sum(1 for a in alarms if a.handle_status not in ("DONE", "IGNORED"))
- edge_false = sum(1 for a in alarms if a.alarm_status == "FALSE")
-
- # 类型分布简写
- type_parts = []
- for at, ac in edge_type_counter.most_common():
- type_parts.append(f"{ALARM_TYPE_NAMES.get(at, at)}{ac}条")
- type_summary = "、".join(type_parts)
-
- # 该节点的 Top3 摄像头
- edge_dev_counter = Counter(a.device_id for a in alarms)
- top3 = edge_dev_counter.most_common(3)
- cam_parts = []
- for dev_id, dev_count in top3:
- cam_name = name_map.get(dev_id, dev_id)
- cam_parts.append(f"{cam_name}({dev_count})")
- cam_summary = "、".join(cam_parts)
-
- lines.append(f"")
- lines.append(f">{edge_name}:{edge_total} 条")
- lines.append(f"> 类型:{type_summary}")
- if edge_unhandled > 0:
- lines.append(f"> 待处理:{edge_unhandled} 条 误报:{edge_false}条")
- lines.append(f"> Top设备:{cam_summary}")
+ lines.append("**告警摄像头 Top5**")
+ for i, (cam_name, count) in enumerate(top5_cameras, 1):
+ lines.append(f">{i}. {cam_name} — {count}条")
return "\n".join(lines)
diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py
index eeed834..4a6bd14 100644
--- a/app/services/notify_dispatch.py
+++ b/app/services/notify_dispatch.py
@@ -404,13 +404,22 @@ def _get_permanent_url(snapshot_url: str) -> str:
def _save_order_id(alarm_id: str, order_id: str):
- """将工单ID保存到 alarm_event_ext(ext_type=WORK_ORDER)"""
+ """将工单ID保存到 wechat_card_state + alarm_event_ext"""
db = get_session()
try:
+ # 写 wechat_card_state(order_id ↔ alarm_id 映射)
+ from app.models import WechatCardState
+ card_state = WechatCardState(
+ order_id=str(order_id),
+ alarm_id=alarm_id,
+ )
+ db.merge(card_state)
+
+ # 写 alarm_event_ext(保持兼容)
ext = AlarmEventExt(
alarm_id=alarm_id,
ext_type="WORK_ORDER",
- ext_data={"order_id": order_id},
+ ext_data={"order_id": str(order_id)},
)
db.add(ext)
db.commit()
diff --git a/app/services/wechat_service.py b/app/services/wechat_service.py
index 5d512d4..034cdf4 100644
--- a/app/services/wechat_service.py
+++ b/app/services/wechat_service.py
@@ -42,7 +42,7 @@ class WeChatService:
self._token_expire_at = 0
self._service_base_url = ""
# 缓存 response_code,用于更新卡片状态
- # key: task_id (alarm_id), value: response_code
+ # key: order_id, value: response_code
self._response_codes: Dict[str, str] = {}
def init(self, config):
@@ -94,53 +94,85 @@ class WeChatService:
logger.info("企微 access_token 已更新")
return self._access_token
- def save_response_code(self, task_id: str, response_code: str):
- """保存卡片的 response_code(内存缓存 + 数据库持久化)"""
- self._response_codes[task_id] = response_code
+ def save_response_code(self, order_id: str, response_code: str, alarm_id: str = ""):
+ """保存卡片的 response_code 到 wechat_card_state(以 order_id 为主键)"""
+ self._response_codes[order_id] = response_code
try:
- from app.models import get_session, AlarmEventExt
+ from app.models import get_session, WechatCardState
db = get_session()
try:
- ext = db.query(AlarmEventExt).filter(
- AlarmEventExt.alarm_id == task_id,
- AlarmEventExt.ext_type == "WECHAT_RESPONSE_CODE",
+ state = db.query(WechatCardState).filter(
+ WechatCardState.order_id == order_id,
).first()
- if ext:
- ext.ext_data = {"response_code": response_code}
+ if state:
+ state.response_code = response_code
+ if alarm_id:
+ state.alarm_id = alarm_id
else:
- ext = AlarmEventExt(
- alarm_id=task_id,
- ext_type="WECHAT_RESPONSE_CODE",
- ext_data={"response_code": response_code},
+ state = WechatCardState(
+ order_id=order_id,
+ response_code=response_code,
+ alarm_id=alarm_id or None,
)
- db.add(ext)
+ db.add(state)
db.commit()
finally:
db.close()
except Exception as e:
logger.warning(f"持久化 response_code 失败: {e}")
- def get_response_code(self, task_id: str) -> Optional[str]:
- """获取 response_code(优先内存缓存,回退数据库查询)"""
- code = self._response_codes.pop(task_id, None)
+ def get_response_code(self, order_id: str) -> Optional[str]:
+ """获取 response_code(优先内存缓存,回退数据库查询 wechat_card_state)"""
+ code = self._response_codes.pop(order_id, None)
if code:
return code
try:
- from app.models import get_session, AlarmEventExt
+ from app.models import get_session, WechatCardState
db = get_session()
try:
- ext = db.query(AlarmEventExt).filter(
- AlarmEventExt.alarm_id == task_id,
- AlarmEventExt.ext_type == "WECHAT_RESPONSE_CODE",
+ state = db.query(WechatCardState).filter(
+ WechatCardState.order_id == order_id,
).first()
- if ext and ext.ext_data:
- return ext.ext_data.get("response_code", "")
+ if state and state.response_code:
+ return state.response_code
finally:
db.close()
except Exception as e:
logger.warning(f"查询 response_code 失败: {e}")
return None
+ def get_alarm_id_for_order(self, order_id: str) -> str:
+ """从 wechat_card_state 查 order_id → alarm_id"""
+ try:
+ from app.models import get_session, WechatCardState
+ db = get_session()
+ try:
+ state = db.query(WechatCardState).filter(
+ WechatCardState.order_id == order_id,
+ ).first()
+ return state.alarm_id or "" if state else ""
+ finally:
+ db.close()
+ except Exception as e:
+ logger.warning(f"查询 alarm_id 失败: order={order_id}, error={e}")
+ return ""
+
+ def get_order_id_for_alarm(self, alarm_id: str) -> str:
+ """从 wechat_card_state 查 alarm_id → order_id"""
+ try:
+ from app.models import get_session, WechatCardState
+ db = get_session()
+ try:
+ state = db.query(WechatCardState).filter(
+ WechatCardState.alarm_id == alarm_id,
+ ).first()
+ return state.order_id or "" if state else ""
+ finally:
+ db.close()
+ except Exception as e:
+ logger.warning(f"查询 order_id 失败: alarm={alarm_id}, error={e}")
+ return ""
+
# ==================== 媒体上传 ====================
async def upload_media(self, image_data: bytes, filename: str = "alarm.jpg") -> Optional[str]:
@@ -231,13 +263,14 @@ class WeChatService:
async def send_alarm_card(
self,
user_ids: List[str],
- alarm_id: str,
+ order_id: str,
alarm_type: str,
area_name: str,
camera_name: str,
description: str,
event_time: str,
alarm_level: int = 2,
+ alarm_id: str = "",
) -> bool:
"""
发送按钮交互型模板卡片(个人消息)
@@ -261,7 +294,7 @@ class WeChatService:
"agentid": self.agent_id_int,
"template_card": {
"card_type": "button_interaction",
- "task_id": alarm_id,
+ "task_id": order_id,
"source": {
"desc": "AI安防告警",
"desc_color": 3 if alarm_level <= 1 else 0,
@@ -279,18 +312,18 @@ class WeChatService:
],
"card_action": {
"type": 1,
- "url": self._alarm_detail_url(alarm_id) or "https://work.weixin.qq.com",
+ "url": self._alarm_detail_url(alarm_id or order_id) or "https://work.weixin.qq.com",
},
"button_list": [
{
"text": "确认接单",
"style": 1,
- "key": f"confirm_{alarm_id}",
+ "key": f"confirm_{order_id}",
},
{
"text": "误报忽略",
"style": 2,
- "key": f"ignore_{alarm_id}",
+ "key": f"ignore_{order_id}",
},
],
},
@@ -307,9 +340,9 @@ class WeChatService:
response_code = data.get("response_code", "")
if response_code:
- self.save_response_code(alarm_id, response_code)
+ self.save_response_code(order_id, response_code, alarm_id=alarm_id)
- logger.info(f"企微卡片已发送: alarm={alarm_id}, users={user_ids}")
+ logger.info(f"企微卡片已发送: order={order_id}, alarm={alarm_id}, users={user_ids}")
return True
except Exception as e:
@@ -320,7 +353,7 @@ class WeChatService:
self,
response_code: str,
user_ids: List[str],
- alarm_id: str,
+ order_id: str,
operator_name: str = "",
) -> bool:
"""
@@ -341,7 +374,7 @@ class WeChatService:
"response_code": response_code,
"template_card": {
"card_type": "button_interaction",
- "task_id": alarm_id,
+ "task_id": order_id,
"source": {
"desc": "AI安防工单 - 处理中",
"desc_color": 1,
@@ -352,18 +385,18 @@ class WeChatService:
"sub_title_text": "请在对话框中回复处理结果,或点击按钮快速操作",
"card_action": {
"type": 1,
- "url": self._alarm_detail_url(alarm_id) or "https://work.weixin.qq.com",
+ "url": self._alarm_detail_url(order_id) or "https://work.weixin.qq.com",
},
"button_list": [
{
"text": "已处理完成",
"style": 1,
- "key": f"complete_{alarm_id}",
+ "key": f"complete_{order_id}",
},
{
"text": "标记误报",
"style": 2,
- "key": f"false_{alarm_id}",
+ "key": f"false_{order_id}",
},
],
},
@@ -378,7 +411,7 @@ class WeChatService:
logger.error(f"更新卡片到步骤2失败: {data}")
return False
- logger.info(f"卡片已更新到步骤2: alarm={alarm_id}, operator={operator_name}")
+ logger.info(f"卡片已更新到步骤2: order={order_id}, operator={operator_name}")
return True
except Exception as e:
@@ -389,7 +422,7 @@ class WeChatService:
self,
response_code: str,
user_ids: List[str],
- alarm_id: str,
+ order_id: str,
action: str,
operator_name: str = "",
) -> bool:
@@ -435,7 +468,7 @@ class WeChatService:
logger.error(f"更新卡片终态失败: {data}")
return False
- logger.info(f"卡片已更新到终态: alarm={alarm_id}, action={action}")
+ logger.info(f"卡片已更新到终态: order={order_id}, action={action}")
return True
except Exception as e: