Files
iot-device-management-service/app/routers/wechat_callback.py
16337 97dd664f2e 功能:卡片按钮回调改为调 IoT 接口,方案A严格模式
- 确认接单 → 调 IoT /confirm
- 误报忽略 → 调 IoT /false-alarm
- 已处理 → 调 IoT /submit
- 标记误报 → 调 IoT /false-alarm
- IoT 调用失败时降级直接更新告警状态
- 工单未启用时保持原有直接更新逻辑
2026-03-23 11:54:08 +08:00

307 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
企微回调路由
处理安保人员在企微卡片上的操作(前往处理/误报忽略)。
接收企微用户消息并路由到交互Agent。
"""
import asyncio
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import PlainTextResponse
from app.yudao_compat import YudaoResponse
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.utils.logger import logger
router = APIRouter(prefix="/api/wechat", tags=["企微回调"])
# ==================== 交互Agent消息回调 ====================
@router.get("/agent/callback")
async def wechat_agent_verify(
msg_signature: str = Query(...),
timestamp: str = Query(...),
nonce: str = Query(...),
echostr: str = Query(...),
):
"""企微回调URL验证首次配置时调用"""
from app.services.wechat_crypto import WeChatCrypto
try:
crypto = WeChatCrypto()
echo = crypto.verify_url(msg_signature, timestamp, nonce, echostr)
return PlainTextResponse(content=echo)
except Exception as e:
logger.error(f"企微URL验证失败: {e}")
return PlainTextResponse(content="", status_code=403)
@router.post("/agent/callback")
async def wechat_agent_message(
request: Request,
msg_signature: str = Query(...),
timestamp: str = Query(...),
nonce: str = Query(...),
):
"""接收企微用户消息路由到交互Agent"""
body = await request.body()
from app.services.wechat_crypto import WeChatCrypto
try:
crypto = WeChatCrypto()
msg = crypto.decrypt_message(body, msg_signature, timestamp, nonce)
except Exception as e:
logger.error(f"企微消息解密失败: {e}")
return PlainTextResponse(content="success")
msg_type = msg.get("MsgType", "")
event = msg.get("Event", "")
# ---- 模板卡片按钮点击事件 ----
if msg_type == "event" and event == "template_card_event":
asyncio.create_task(_process_card_button_click(msg))
return PlainTextResponse(content="success")
# ---- 文本消息 → Agent 对话 ----
if msg_type == "text":
user_id = msg.get("FromUserName", "")
content = msg.get("Content", "")
logger.info(f"收到企微消息: user={user_id}, content={content[:50]}")
asyncio.create_task(_process_agent_message(user_id, content))
return PlainTextResponse(content="success")
# ---- 图片消息 → Agent 图片分析 ----
if msg_type == "image":
user_id = msg.get("FromUserName", "")
media_id = msg.get("MediaId", "")
logger.info(f"收到企微图片: user={user_id}, media_id={media_id}")
asyncio.create_task(_process_agent_image(user_id, media_id))
return PlainTextResponse(content="success")
return PlainTextResponse(content="success")
async def _process_agent_message(user_id: str, content: str):
"""异步处理文字消息并主动回复"""
try:
from app.services.agent_dispatcher import get_agent_dispatcher
from app.services.wechat_service import get_wechat_service
dispatcher = get_agent_dispatcher()
reply = await dispatcher.handle_message(user_id, content)
wechat = get_wechat_service()
await wechat.send_text_message(user_id, reply)
except Exception as e:
logger.error(f"Agent消息处理失败: user={user_id}, error={e}", exc_info=True)
async def _process_agent_image(user_id: str, media_id: str):
"""异步处理图片消息:先回复"正在分析",再调 VLM 分析"""
try:
from app.services.agent_dispatcher import get_agent_dispatcher
from app.services.wechat_service import get_wechat_service
wechat = get_wechat_service()
# 先回复提示,避免用户等太久
await wechat.send_text_message(user_id, "正在分析图片,请稍候...")
dispatcher = get_agent_dispatcher()
reply = await dispatcher.handle_image(user_id, media_id)
await wechat.send_text_message(user_id, reply)
except Exception as e:
logger.error(f"Agent图片处理失败: user={user_id}, error={e}", exc_info=True)
try:
from app.services.wechat_service import get_wechat_service
wechat = get_wechat_service()
await wechat.send_text_message(user_id, "图片处理失败,请稍后重试。")
except Exception:
pass
async def _process_card_button_click(msg: dict):
"""
处理模板卡片按钮点击事件(两步状态机)
第一步按钮:
- confirm_{alarm_id}: 确认接单 → 更新状态为处理中,创建工单,卡片更新到第二步
- ignore_{alarm_id}: 误报忽略 → 终态,自动结单
第二步按钮:
- complete_{alarm_id}: 已处理完成 → 终态,自动结单
- false_{alarm_id}: 标记误报 → 终态,自动结单
终态按钮:
- done_{alarm_id}: 已完成,忽略
"""
try:
from app.services.wechat_service import get_wechat_service
from app.services.work_order_client import get_work_order_client
user_id = msg.get("FromUserName", "")
event_key = msg.get("EventKey", "")
task_id = msg.get("TaskId", "")
response_code = msg.get("ResponseCode", "")
logger.info(
f"卡片按钮点击: user={user_id}, key={event_key}, task={task_id}"
)
# 解析 action 和 alarm_id
if event_key.startswith("confirm_"):
action = "confirm"
alarm_id = event_key[len("confirm_"):]
elif event_key.startswith("ignore_"):
action = "ignore"
alarm_id = event_key[len("ignore_"):]
elif event_key.startswith("complete_"):
action = "complete"
alarm_id = event_key[len("complete_"):]
elif event_key.startswith("false_"):
action = "false"
alarm_id = event_key[len("false_"):]
elif event_key.startswith("done_"):
return # 终态按钮,忽略
else:
logger.warning(f"未知的按钮 key: {event_key}")
return
wechat = get_wechat_service()
wo_client = get_work_order_client()
# 保存 response_code 供后续 IoT 回调时更新卡片
if response_code:
wechat.save_response_code(alarm_id, response_code)
# 获取关联的工单ID
order_id = _get_order_id_for_alarm(alarm_id)
# ---- 确认接单:调 IoT /confirm ----
if action == "confirm":
if order_id and wo_client.enabled:
success = await wo_client.confirm_order(order_id)
if success:
logger.info(f"IoT工单已确认: alarm={alarm_id}, order={order_id}, by={user_id}")
else:
logger.warning(f"IoT工单确认失败: alarm={alarm_id}, order={order_id}")
# IoT 调用失败时降级直接更新
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED",
handle_status="HANDLING", handler=user_id, remark="企微确认(IoT降级)")
else:
# 工单未启用时直接更新告警状态
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED",
handle_status="HANDLING", handler=user_id, remark="企微确认接单")
# 更新卡片提示去H5处理
if response_code:
await wechat.update_alarm_card_step2(
response_code=response_code,
user_ids=[user_id],
alarm_id=alarm_id,
operator_name=user_id,
)
# ---- 误报忽略:调 IoT /false-alarm ----
elif action == "ignore":
if order_id and wo_client.enabled:
success = await wo_client.false_alarm(order_id)
if success:
logger.info(f"IoT工单已标记误报: alarm={alarm_id}, order={order_id}, by={user_id}")
else:
logger.warning(f"IoT误报标记失败: alarm={alarm_id}, order={order_id}")
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微误报(IoT降级)")
else:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微误报忽略")
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
alarm_id=alarm_id, action="ignore", operator_name=user_id,
)
# ---- 已处理完成step2按钮保留兼容调 IoT /submit ----
elif action == "complete":
if order_id and wo_client.enabled:
success = await wo_client.submit_order(order_id, result=f"已处理 by {user_id}")
if success:
logger.info(f"IoT工单已提交: alarm={alarm_id}, order={order_id}")
else:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
handle_status="DONE", handler=user_id, remark="企微完成(IoT降级)")
else:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
handle_status="DONE", handler=user_id, remark="企微已处理")
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
alarm_id=alarm_id, action="complete", operator_name=user_id,
)
# ---- 标记误报step2按钮保留兼容----
elif action == "false":
if order_id and wo_client.enabled:
success = await wo_client.false_alarm(order_id)
if success:
logger.info(f"IoT工单已标记误报: alarm={alarm_id}, order={order_id}")
else:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微误报(IoT降级)")
else:
service = get_alarm_event_service()
service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微标记误报")
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
alarm_id=alarm_id, action="false", operator_name=user_id,
)
except Exception as e:
logger.error(f"处理卡片按钮点击失败: {e}", exc_info=True)
def _get_order_id_for_alarm(alarm_id: str) -> str:
"""从 alarm_event_ext 中获取关联的工单ID"""
from app.models import get_session, AlarmEventExt
db = get_session()
try:
ext = db.query(AlarmEventExt).filter(
AlarmEventExt.alarm_id == alarm_id,
AlarmEventExt.ext_type == "WORK_ORDER",
).first()
if ext and ext.ext_data:
return ext.ext_data.get("order_id", "")
return ""
except Exception as e:
logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}")
return ""
finally:
db.close()
# ==================== Agent测试接口开发用 ====================
@router.post("/agent/test")
async def test_agent_message(
user_id: str = Query(default="test_user"),
content: str = Query(..., description="测试消息内容"),
):
"""测试Agent对话开发用无加密直接返回回复"""
from app.services.agent_dispatcher import get_agent_dispatcher
dispatcher = get_agent_dispatcher()
reply = await dispatcher.handle_message(user_id, content)
return YudaoResponse.success({"user_id": user_id, "content": content, "reply": reply})