Files
iot-device-management-service/app/routers/wechat_callback.py
16337 9143022ee8 fix: 区分误报(IGNORED)和自动结单(DONE)状态
- VLM误报和手动忽略的handle_status改为IGNORED
- 自动结单(resolve_alarm)检查IGNORED状态,不覆盖误报
- 前端忽略操作兼容转换时自动设置handleStatus=IGNORED

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 09:12:14 +08:00

205 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
企微回调路由
处理安保人员在企微卡片上的操作(前往处理/已处理/误报忽略)。
提供告警详情接口供 H5 页面使用。
接收企微用户消息并路由到交互Agent。
"""
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel
from typing import Optional
from app.yudao_compat import YudaoResponse
from app.models import get_session, AlarmEvent, AlarmLlmAnalysis
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.utils.logger import logger
router = APIRouter(prefix="/api/wechat", tags=["企微回调"])
class AlarmActionRequest(BaseModel):
"""企微卡片操作请求"""
alarm_id: str
action: str # confirm / complete / ignore
operator_uid: str # 企微 userid
remark: Optional[str] = None
@router.get("/alarm_detail")
async def get_alarm_detail(alarm_id: str = Query(..., description="告警ID")):
"""
告警详情接口(供 H5 页面使用,无认证)
返回告警基本信息 + VLM 分析描述
"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if not alarm:
return YudaoResponse.error(404, "告警不存在")
# 查 VLM 分析结果
vlm_desc = ""
analysis = db.query(AlarmLlmAnalysis).filter(
AlarmLlmAnalysis.alarm_id == alarm_id
).order_by(AlarmLlmAnalysis.id.desc()).first()
if analysis:
vlm_desc = analysis.summary or ""
return YudaoResponse.success({
"alarm_id": alarm.alarm_id,
"alarm_type": alarm.alarm_type,
"device_id": alarm.device_id,
"scene_id": alarm.scene_id,
"event_time": alarm.event_time.strftime('%Y-%m-%d %H:%M:%S') if alarm.event_time else "",
"alarm_level": alarm.alarm_level,
"alarm_status": alarm.alarm_status,
"handle_status": alarm.handle_status,
"snapshot_url": alarm.snapshot_url or "",
"handler": alarm.handler or "",
"handle_remark": alarm.handle_remark or "",
"vlm_description": vlm_desc,
})
finally:
db.close()
@router.post("/callback/alarm_action")
async def alarm_action_callback(
req: AlarmActionRequest,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
企微告警操作回调(无认证,由 H5 页面调用)
action:
- confirm: 前往处理 → handle_status=HANDLING
- complete: 已处理完成 → handle_status=DONE, alarm_status=CLOSED
- ignore: 误报忽略 → alarm_status=FALSE, handle_status=DONE
"""
action_map = {
"confirm": {
"alarm_status": "CONFIRMED",
"handle_status": "HANDLING",
"remark": "前往处理",
},
"complete": {
"alarm_status": "CLOSED",
"handle_status": "DONE",
"remark": "手动结单",
},
"ignore": {
"alarm_status": "FALSE",
"handle_status": "IGNORED",
"remark": "标记误报",
},
}
action_cfg = action_map.get(req.action)
if not action_cfg:
return YudaoResponse.error(400, f"无效操作: {req.action}")
result = service.handle_alarm(
alarm_id=req.alarm_id,
alarm_status=action_cfg["alarm_status"],
handle_status=action_cfg["handle_status"],
handler=req.operator_uid,
remark=req.remark or action_cfg["remark"],
)
if not result:
return YudaoResponse.error(404, "告警不存在")
logger.info(
f"企微操作: alarm={req.alarm_id}, action={req.action}, "
f"operator={req.operator_uid}"
)
return YudaoResponse.success(True)
# ==================== 交互Agent消息回调 ====================
@router.get("/agent/callback")
async def wechat_agent_verify(
msg_signature: str = Query(...),
timestamp: str = Query(...),
nonce: str = Query(...),
echostr: str = Query(...),
):
"""企微回调URL验证首次配置时调用"""
from app.services.wechat_crypto import WeChatCrypto
try:
crypto = WeChatCrypto()
echo = crypto.verify_url(msg_signature, timestamp, nonce, echostr)
return PlainTextResponse(content=echo)
except Exception as e:
logger.error(f"企微URL验证失败: {e}")
return PlainTextResponse(content="", status_code=403)
@router.post("/agent/callback")
async def wechat_agent_message(
request: Request,
msg_signature: str = Query(...),
timestamp: str = Query(...),
nonce: str = Query(...),
):
"""接收企微用户消息路由到交互Agent"""
body = await request.body()
from app.services.wechat_crypto import WeChatCrypto
try:
crypto = WeChatCrypto()
msg = crypto.decrypt_message(body, msg_signature, timestamp, nonce)
except Exception as e:
logger.error(f"企微消息解密失败: {e}")
return PlainTextResponse(content="success")
# 只处理文本消息
if msg.get("MsgType") != "text":
return PlainTextResponse(content="success")
user_id = msg.get("FromUserName", "")
content = msg.get("Content", "")
logger.info(f"收到企微消息: user={user_id}, content={content[:50]}")
# 异步处理企微要求5秒内响应通过主动消息回复
asyncio.create_task(_process_agent_message(user_id, content))
return PlainTextResponse(content="success")
async def _process_agent_message(user_id: str, content: str):
"""异步处理消息并主动回复"""
try:
from app.services.agent_dispatcher import get_agent_dispatcher
from app.services.wechat_service import get_wechat_service
dispatcher = get_agent_dispatcher()
reply = await dispatcher.handle_message(user_id, content)
wechat = get_wechat_service()
await wechat.send_text_message(user_id, reply)
except Exception as e:
logger.error(f"Agent消息处理失败: user={user_id}, error={e}", exc_info=True)
# ==================== Agent测试接口开发用 ====================
@router.post("/agent/test")
async def test_agent_message(
user_id: str = Query(default="test_user"),
content: str = Query(..., description="测试消息内容"),
):
"""测试Agent对话开发用无加密直接返回回复"""
from app.services.agent_dispatcher import get_agent_dispatcher
dispatcher = get_agent_dispatcher()
reply = await dispatcher.handle_message(user_id, content)
return YudaoResponse.success({"user_id": user_id, "content": content, "reply": reply})